Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25212][SQL] Support Filter in ConvertToLocalRelation #22205

Closed
wants to merge 28 commits into from

Conversation

bogdanrdc
Copy link
Contributor

What changes were proposed in this pull request?

Support Filter in ConvertToLocalRelation, similar to how Project works.
Additionally, in Optimizer, run ConvertToLocalRelation earlier to simplify the plan. This is good for very short queries which often are queries on local relations.

How was this patch tested?

New test. Manual benchmark.

@bogdanrdc
Copy link
Contributor Author

To justify copying ConvertToLocalRelation earlier: this code produces 10 ms after this change vs 45 ms before:

    val manyCols = (0 until 1000).map { i => s"value as col$i"}
    val local = Seq(1, 2, 3).toDS().selectExpr(manyCols: _*).where($"col3" > 4)
    val plan = local.queryExecution.analyzed
    val numIter = 1000
    val start = System.nanoTime()
    for (i <- 0 until numIter) {
      spark.sessionState.optimizer.execute(plan)
    }
    val durationMs = (System.nanoTime() - start) / numIter / 1000000
    println(s"duration $durationMs")

@bogdanrdc
Copy link
Contributor Author

cc @gatorsmile @hvanhovell

@SparkQA
Copy link

SparkQA commented Aug 23, 2018

Test build #95162 has finished for PR 22205 at commit d7e49e7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 24, 2018

Test build #95204 has finished for PR 22205 at commit 326e5d7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -130,6 +130,10 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
// since the other rules might make two separate Unions operators adjacent.
Batch("Union", Once,
CombineUnions) ::
// run this once earlier. this might simplify the plan and reduce cost of optimizer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @bogdanrdc , could explain more about this? Why is it necessary?

Copy link
Contributor Author

@bogdanrdc bogdanrdc Aug 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it makes the optimizer faster for short queries. see code above. a query such as Filter(LocalRelation), without this change, would go through all the heavy optimizer rules. with this change, the query becomes just LocalRelation earler and doesn't trigger many rules.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you put your comment above into the comment in code

@@ -1349,6 +1353,12 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] {

case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) =>
LocalRelation(output, data.take(limit), isStreaming)

case Filter(condition, LocalRelation(output, data, isStreaming))
if !hasUnevaluableExpr(condition) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the condition is non-deterministic, the values will be always the same after the plans are optimized. The DataFrame with non-deterministic filters will always return the same result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is OK, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We blocks all these optimization in the other optimization rules, e.g., ConstantFolding. All the non-deterministic expressions are not foldable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConvertToLocalRelation was already doing this for Project so I assumed it's OK. what I did is exactly how Project(LocalRelation) works.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine about introducing this change since we already did it in Project(LocalRelation).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it is fine in this case. The only thing is that it violates the contract of the optimizer: it should not change the results of a query.

@gatorsmile
Copy link
Member

Many test cases will be invalid after this rule is applied, since they are built on LocalRelation. Thus, how about turning off the rule ConvertToLocalRelation by using the conf spark.sql.optimizer.excludedRules in our sql/hive and sql/core modules.

@bogdanrdc
Copy link
Contributor Author

Turning the whole rule off will disable also the existing Project(LocalRelation). By invalid you mean they will not test what they are supposed to test anymore?

@gatorsmile
Copy link
Member

gatorsmile commented Aug 27, 2018

It would be safer to turn off this rule, since it will skip the actual query execution. Normally, the tests are introduced for testing end-to-end scenarios instead of testing this rule.

@hvanhovell
Copy link
Contributor

@gatorsmile what are you afraid of exactly? We could check which tests are affected. Also do you want to disable this for testing only?

@gatorsmile
Copy link
Member

Yes. Disable this rule for testing only.

@SparkQA
Copy link

SparkQA commented Aug 28, 2018

Test build #95344 has finished for PR 22205 at commit 9ab1fa0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

new SparkConf()
.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set(SQLConf.CODEGEN_FALLBACK.key, "false")
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, excludeRules.mkString(","))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same thing is needed for TestHive

import org.apache.spark.sql.internal.SQLConf.OPTIMIZER_EXCLUDED_RULES
import org.apache.spark.util.Utils
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed?

@SparkQA
Copy link

SparkQA commented Aug 28, 2018

Test build #95351 has finished for PR 22205 at commit 18f6dcd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 28, 2018

Test build #95356 has finished for PR 22205 at commit ae148c8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

sujith71955 and others added 9 commits August 28, 2018 19:35
…proper while submitting spark yarn application

**## What changes were proposed in this pull request?**
When the yarn.nodemanager.resource.memory-mb or yarn.scheduler.maximum-allocation-mb
 memory assignment is insufficient, Spark always reports an error request to adjust
yarn.scheduler.maximum-allocation-mb even though in message it shows the memory value
of yarn.nodemanager.resource.memory-mb parameter,As the error Message is bit misleading to the user  we can modify the same, We can keep the error message same as executor memory validation message.

Defintion of **yarn.nodemanager.resource.memory-mb:**
Amount of physical memory, in MB, that can be allocated for containers. It means the amount of memory YARN can utilize on this node and therefore this property should be lower then the total memory of that machine.
**yarn.scheduler.maximum-allocation-mb:**
It defines the maximum memory allocation available for a container in MB
it means RM can only allocate memory to containers in increments of "yarn.scheduler.minimum-allocation-mb" and not exceed "yarn.scheduler.maximum-allocation-mb" and It should not be more than total allocated memory of the Node.

**## How was this patch tested?**
Manually tested in hdfs-Yarn clustaer

Closes apache#22199 from sujith71955/maste_am_log.

Authored-by: s71955 <sujithchacko.2010@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
…cated records when `failOnDataLoss=false`

## What changes were proposed in this pull request?

When there are missing offsets, Kafka v2 source may return duplicated records when `failOnDataLoss=false` because it doesn't skip missing offsets.

This PR fixes the issue and also adds regression tests for all Kafka readers.

## How was this patch tested?

New tests.

Closes apache#22207 from zsxwing/SPARK-25214.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
…register itself from rm

## What changes were proposed in this pull request?

When using older versions of spark releases,  a use case generated a huge code-gen file which hit the limitation `Constant pool has grown past JVM limit of 0xFFFF`.  In this situation, it should fail immediately. But the diagnosis message sent to RM is too large,  the ApplicationMaster suspended and RM's ZKStateStore was crashed. For 2.3 or later spark releases the limitation of code-gen has been removed, but maybe there are still some uncaught exceptions that contain oversized error message will cause such a problem.

This PR is aim to cut down the diagnosis message size.

## How was this patch tested?

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes apache#22180 from yaooqinn/SPARK-25174.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?

`parallelize` uses integer multiplication to determine the split indices. It might cause integer overflow.

## How was this patch tested?

unit test

Closes apache#22225 from mengxr/SPARK-25234.

Authored-by: Xiangrui Meng <meng@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes apache#22189 from movrsprbp/patch-1.

Authored-by: jaroslav chládek <mastermism@gmail.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?

[SPARK-25095](apache@ad45299) introduced `ambiguous reference to overloaded definition`

```
[error] /Users/d_tsai/dev/apache-spark/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala:242: ambiguous reference to overloaded definition,
[error] both method addTaskCompletionListener in class TaskContext of type [U](f: org.apache.spark.TaskContext => U)org.apache.spark.TaskContext
[error] and  method addTaskCompletionListener in class TaskContext of type (listener: org.apache.spark.util.TaskCompletionListener)org.apache.spark.TaskContext
[error] match argument types (org.apache.spark.TaskContext => Unit)
[error]           context.addTaskCompletionListener(_ => server.close())
[error]                   ^
[error] one error found
[error] Compile failed at Aug 24, 2018 1:56:06 PM [31.582s]
```
which fails the Scala 2.12 branch build.

## How was this patch tested?

Existing tests

Closes apache#22229 from dbtsai/fix-2.12-build.

Authored-by: DB Tsai <d_tsai@apple.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?

An RDD is created using LabeledPoint, but the comment is like #LabeledPoint(feature, label).
Although in the method ChiSquareTest.test, the second parameter is feature and the third parameter is label, it it better to write label in front of feature here because if an RDD is created using LabeldPoint, what we get are actually (label, feature) pairs.
Now it is changed as LabeledPoint(label, feature).

The comments in Scala and Java example have the same typos.

## How was this patch tested?

tested

https://issues.apache.org/jira/browse/SPARK-24688

Author: Weizhe Huang 492816239qq.com

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes apache#21665 from uzmijnlm/my_change.

Authored-by: Huangweizhe <huangweizhe@bbdservice.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
…turn duplicated records when `failOnDataLoss=false`

## What changes were proposed in this pull request?

This is a follow up PR for apache#22207 to fix a potential flaky test. `processAllAvailable` doesn't work for continuous processing so we should not use it for a continuous query.

## How was this patch tested?

Jenkins.

Closes apache#22230 from zsxwing/SPARK-25214-2.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
## What changes were proposed in this pull request?

Improved the documentation for the datetime functions in `org.apache.spark.sql.functions` by adding details about the supported column input types, the column return type, behaviour on invalid input, supporting examples and clarifications.

## How was this patch tested?

Manually testing each of the datetime functions with different input to ensure that the corresponding Javadoc/Scaladoc matches the behaviour of the function. Successfully ran the `unidoc` SBT process.

Closes apache#20901 from abradbury/SPARK-23792.

Authored-by: Adam Bradbury <abradbury@users.noreply.github.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
HyukjinKwon and others added 10 commits August 28, 2018 19:35
## What changes were proposed in this pull request?

`__version__` in `setup.py` is currently being dynamically read by `exec`; so the linter complains. Better just switch it off for this line for now.

**Before:**

```bash
$ python -m flake8 . --count --select=E9,F82 --show-source --statistics
./setup.py:37:11: F821 undefined name '__version__'
VERSION = __version__
          ^
1     F821 undefined name '__version__'
1
```

**After:**

```bash
$ python -m flake8 . --count --select=E9,F82 --show-source --statistics
0
```

## How was this patch tested?

Manually tested.

Closes apache#22235 from HyukjinKwon/SPARK-23698.

Authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
… global limit

## What changes were proposed in this pull request?

This is based on the discussion https://github.com/apache/spark/pull/16677/files#r212805327.

As SQL standard doesn't mandate that a nested order by followed by a limit has to respect that ordering clause, this patch removes the `child.outputOrdering` check.

## How was this patch tested?

Unit tests.

Closes apache#22239 from viirya/improve-global-limit-parallelism-followup.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… to configure the capacity of fast aggregation.

## What changes were proposed in this pull request?

this pr add a configuration parameter to configure the capacity of fast aggregation.
Performance comparison:

```
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
 Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
 Aggregate w multiple keys:               Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
 fasthash = default                            5612 / 5882          3.7         267.6       1.0X
 fasthash = config                             3586 / 3595          5.8         171.0       1.6X

```

## How was this patch tested?
the existed test cases.

Closes apache#21931 from heary-cao/FastHashCapacity.

Authored-by: caoxuewen <cao.xuewen@zte.com.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?

This PR adds a unit test for OpenHashMap , this can help developers  to distinguish between the 0/0.0/0L and null

## How was this patch tested?

Closes apache#22241 from 10110346/openhashmap.

Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?

Fix flaky synchronization in Kafka tests - we need to use the scan config that was persisted rather than reconstructing it to identify the stream's current configuration.

We caught most instances of this in the original PR, but this one slipped through.

## How was this patch tested?

n/a

Closes apache#22245 from jose-torres/fixflake.

Authored-by: Jose Torres <torres.joseph.f+github@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
…filesystem explicitly specified by the user

## What changes were proposed in this pull request?

Our HDFS cluster configured 5 nameservices: `nameservices1`, `nameservices2`, `nameservices3`, `nameservices-dev1` and `nameservices4`, but `nameservices-dev1` unstable. So sometimes an error occurred and causing the entire job failed since [SPARK-24149](https://issues.apache.org/jira/browse/SPARK-24149):

![image](https://user-images.githubusercontent.com/5399861/42434779-f10c48fc-8386-11e8-98b0-4d9786014744.png)

I think it's best to add a switch here.

## How was this patch tested?

manual tests

Closes apache#21734 from wangyum/SPARK-24149.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?

Updated documentation for Spark on Kubernetes for the upcoming 2.4.0.

Please review http://spark.apache.org/contributing.html before opening a pull request.

mccheah erikerlandson

Closes apache#22224 from liyinan926/master.

Authored-by: Yinan Li <ynli@google.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
The PR excludes Python UDFs filters in FileSourceStrategy so that they don't ExtractPythonUDF rule to throw exception. It doesn't make sense to pass Python UDF filters in FileSourceStrategy anyway because they cannot be used as push down filters.

## How was this patch tested?
Add a new regression test

Closes apache#22104 from icexelloss/SPARK-24721-udf-filter.

Authored-by: Li Jin <ice.xelloss@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…nd SocketAuthHelper

## What changes were proposed in this pull request?

Make sure TransportServer and SocketAuthHelper close the resources for all types of errors.

## How was this patch tested?

Jenkins

Closes apache#22210 from zsxwing/SPARK-25218.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
## What changes were proposed in this pull request?

As the user uses Kafka transactions to write data, the offsets in Kafka will be non-consecutive. It will contains some transaction (commit or abort) markers. In addition, if the consumer's `isolation.level` is `read_committed`, `poll` will not return aborted messages either. Hence, we will see non-consecutive offsets in the date returned by `poll`. However, as `seekToEnd` may move the offset point to these missing offsets, there are 4 possible corner cases we need to support:

- The whole batch contains no data messages
- The first offset in a batch is not a committed data message
- The last offset in a batch is not a committed data message
- There is a gap in the middle of a batch

They are all covered by the new unit tests.

## How was this patch tested?

The new unit tests.

Closes apache#22042 from zsxwing/kafka-transaction-read.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
@SparkQA
Copy link

SparkQA commented Aug 28, 2018

Test build #95355 has finished for PR 22205 at commit 7e0f5b2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 28, 2018

Test build #95357 has finished for PR 22205 at commit 0a53247.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 28, 2018

Test build #95375 has finished for PR 22205 at commit d552cc1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Aug 28, 2018

Test build #95368 has finished for PR 22205 at commit f769a94.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 28, 2018

Test build #95370 has finished for PR 22205 at commit cb067c3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

Thanks! Merged to master. The JIRA is created to resolve the issues regarding the tests.

@asfgit asfgit closed this in 1038540 Aug 28, 2018
@SparkQA
Copy link

SparkQA commented Aug 29, 2018

Test build #95384 has finished for PR 22205 at commit d552cc1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -1349,6 +1357,12 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] {

case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) =>
LocalRelation(output, data.take(limit), isStreaming)

case Filter(condition, LocalRelation(output, data, isStreaming))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

srowen pushed a commit that referenced this pull request Dec 13, 2018
## What changes were proposed in this pull request?
There are some comments issues left when `ConvertToLocalRelation` rule was added (see #22205/[SPARK-25212](https://issues.apache.org/jira/browse/SPARK-25212)). This PR fixes those comments issues.

## How was this patch tested?
N/A

Closes #23273 from seancxmao/ConvertToLocalRelation-doc.

Authored-by: seancxmao <seancxmao@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
holdenk pushed a commit to holdenk/spark that referenced this pull request Jan 5, 2019
## What changes were proposed in this pull request?
There are some comments issues left when `ConvertToLocalRelation` rule was added (see apache#22205/[SPARK-25212](https://issues.apache.org/jira/browse/SPARK-25212)). This PR fixes those comments issues.

## How was this patch tested?
N/A

Closes apache#23273 from seancxmao/ConvertToLocalRelation-doc.

Authored-by: seancxmao <seancxmao@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?
There are some comments issues left when `ConvertToLocalRelation` rule was added (see apache#22205/[SPARK-25212](https://issues.apache.org/jira/browse/SPARK-25212)). This PR fixes those comments issues.

## How was this patch tested?
N/A

Closes apache#23273 from seancxmao/ConvertToLocalRelation-doc.

Authored-by: seancxmao <seancxmao@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
@ruloweb
Copy link

ruloweb commented Mar 24, 2019

The LocalRelation early batch helps to fix this issue https://issues.apache.org/jira/browse/SPARK-25212 which still occurs on Spark 2.3, do you think it makes sense to create a backport?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet