Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Branch 2.2 #20569

Closed
wants to merge 585 commits into from
Closed

Branch 2.2 #20569

wants to merge 585 commits into from

Conversation

MohammedLayeeq
Copy link

What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Sumedh Wale and others added 30 commits July 6, 2017 14:47
## What changes were proposed in this pull request?

Corrects offsetInBytes calculation in UnsafeRow.writeToStream. Known failures include writes to some DataSources that have own SparkPlan implementations and cause EXCHANGE in writes.

## How was this patch tested?

Extended UnsafeRowSuite.writeToStream to include an UnsafeRow over byte array having non-zero offset.

Author: Sumedh Wale <swale@snappydata.io>

Closes #18535 from sumwale/SPARK-21312.

(cherry picked from commit 14a3bb3)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…point dir should be deleted

## What changes were proposed in this pull request?

Stopping query while it is being initialized can throw interrupt exception, in which case temporary checkpoint directories will not be deleted, and the test will fail.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #18442 from tdas/DatastreamReaderWriterSuite-fix.

(cherry picked from commit 60043f2)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
## What changes were proposed in this pull request?

Few changes to the Structured Streaming documentation
- Clarify that the entire stream input table is not materialized
- Add information for Ganglia
- Add Kafka Sink to the main docs
- Removed a couple of leftover experimental tags
- Added more associated reading material and talk videos.

In addition, #16856 broke the link to the RDD programming guide in several places while renaming the page. This PR fixes those sameeragarwal cloud-fan.
- Added a redirection to avoid breaking internal and possible external links.
- Removed unnecessary redirection pages that were there since the separate scala, java, and python programming guides were merged together in 2013 or 2014.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #18485 from tdas/SPARK-21267.

(cherry picked from commit 0217dfd)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
## What changes were proposed in this pull request?

SPARK-20979 added a new structured streaming source: Rate source. This patch adds the corresponding documentation to programming guide.

## How was this patch tested?

Tested by running jekyll locally.

Author: Prashant Sharma <prashant@apache.org>
Author: Prashant Sharma <prashsh1@in.ibm.com>

Closes #18562 from ScrapCodes/spark-21069/rate-source-docs.

(cherry picked from commit d0bfc67)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
## What changes were proposed in this pull request?

This is backport of #18455
When data type is struct, InSet now uses TypeUtils.getInterpretedOrdering (similar to EqualTo) to build a TreeSet. In other cases it will use a HashSet as before (which should be faster). Similarly, In.eval uses Ordering.equiv instead of equals.

## How was this patch tested?
New test in SQLQuerySuite.

Author: Bogdan Raducanu <bogdan@databricks.com>

Closes #18563 from bogdanrdc/SPARK-21228-BRANCH2.2.
…clean up stopped sessions.

`SparkSessionBuilderSuite` should clean up stopped sessions. Otherwise, it leaves behind some stopped `SparkContext`s interfereing with other test suites using `ShardSQLContext`.

Recently, master branch fails consequtively.
- https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/

Pass the Jenkins with a updated suite.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #18567 from dongjoon-hyun/SPARK-SESSION.

(cherry picked from commit 0b8dd2d)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…event.

This makes sures that listeners get updated task information; otherwise it's
possible to write incomplete task information into event logs, for example,
making the information in a replayed UI inconsistent with the original
application.

Added a new unit test to try to detect the problem, but it's not guaranteed
to fail since it's a race; but it fails pretty reliably for me without the
scheduler changes.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #18393 from vanzin/SPARK-20342.try2.

(cherry picked from commit 9131bdb)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ToMem.

## What changes were proposed in this pull request?

In current code, reducer can break the old shuffle service when `spark.reducer.maxReqSizeShuffleToMem` is enabled. Let's refine document.

Author: jinxing <jinxing6042@126.com>

Closes #18566 from jinxing64/SPARK-21343.

(cherry picked from commit 062c336)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…lyzing empty table

## What changes were proposed in this pull request?

We should be able to store zero size and row count after analyzing empty table.
This is a backport for 9fccc36.

## How was this patch tested?

Added new test.

Author: Zhenhua Wang <wangzhenhua@huawei.com>

Closes #18575 from wzhfy/analyzeEmptyTable-2.2.
…cher.

When `RetryingBlockFetcher` retries fetching blocks. There could be two `DownloadCallback`s download the same content to the same target file. It could cause `ShuffleBlockFetcherIterator` reading a partial result.

This pr proposes to create and delete the tmp files in `OneForOneBlockFetcher`

Author: jinxing <jinxing6042@126.com>
Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #18565 from jinxing64/SPARK-21342.

(cherry picked from commit 6a06c4b)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?

Updating numOutputRows metric was missing from one return path of LeftAnti SortMergeJoin.

## How was this patch tested?

Non-zero output rows manually seen in metrics.

Author: Juliusz Sompolski <julek@databricks.com>

Closes #18494 from juliuszsompolski/SPARK-21272.
## What changes were proposed in this pull request?

Remove all usages of Scala Tuple2 from common/network-* projects. Otherwise, Yarn users cannot use `spark.reducer.maxReqSizeShuffleToMem`.

## How was this patch tested?

Jenkins.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #18593 from zsxwing/SPARK-21369.

(cherry picked from commit 833eab2)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?

Add sql test for window functions, also remove uncecessary test cases in `WindowQuerySuite`.

## How was this patch tested?

Added `window.sql` and the corresponding output file.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #18591 from jiangxb1987/window.

(cherry picked from commit 66d2168)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ndition with blacklisting

There's a race condition in the current TaskSetManager where a failed task is added for retry (addPendingTask), and can asynchronously be assigned to an executor *prior* to the blacklist state (updateBlacklistForFailedTask), the result is the task might re-execute on the same executor.  This is particularly problematic if the executor is shutting down since the retry task immediately becomes a lost task (ExecutorLostFailure).  Another side effect is that the actual failure reason gets obscured by the retry task which never actually executed.  There are sample logs showing the issue in the https://issues.apache.org/jira/browse/SPARK-21219

The fix is to change the ordering of the addPendingTask and updatingBlackListForFailedTask calls in TaskSetManager.handleFailedTask

Implemented a unit test that verifies the task is black listed before it is added to the pending task.  Ran the unit test without the fix and it fails.  Ran the unit test with the fix and it passes.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Eric Vandenberg <ericvandenbergfb.com>

Closes #18427 from ericvandenbergfb/blacklistFix.

## What changes were proposed in this pull request?

This is a backport of the fix to SPARK-21219, already checked in as 96d58f2.

## How was this patch tested?

Ran TaskSetManagerSuite tests locally.

Author: Eric Vandenberg <ericvandenberg@fb.com>

Closes #18604 from jsoltren/branch-2.2.
…oader

## What changes were proposed in this pull request?

`ClassLoader` will preferentially load class from `parent`. Only when `parent` is null or the load failed, that it will call the overridden `findClass` function. To avoid the potential issue caused by loading class using inappropriate class loader, we should set the `parent` of `ClassLoader` to null, so that we can fully control which class loader is used.

This is take over of #17074,  the primary author of this PR is taroplus .

Should close #17074 after this PR get merged.

## How was this patch tested?

Add test case in `ExecutorClassLoaderSuite`.

Author: Kohki Nishio <taroplus@me.com>
Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #18614 from jiangxb1987/executor_classloader.

(cherry picked from commit e08d06b)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…the staging files in long running scenario

## What changes were proposed in this pull request?

This issue happens in long running application with yarn cluster mode, because yarn#client doesn't sync token with AM, so it will always keep the initial token, this token may be expired in the long running scenario, so when yarn#client tries to clean up staging directory after application finished, it will use this expired token and meet token expire issue.

## How was this patch tested?

Manual verification is secure cluster.

Author: jerryshao <sshao@hortonworks.com>

Closes #18617 from jerryshao/SPARK-21376.

(cherry picked from commit cb8d5cc)
…rison

## What changes were proposed in this pull request?

This PR fixes a wrong comparison for `BinaryType`. This PR enables unsigned comparison and unsigned prefix generation for an array for `BinaryType`. Previous implementations uses signed operations.

## How was this patch tested?

Added a test suite in `OrderingSuite`.

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #18571 from kiszk/SPARK-21344.

(cherry picked from commit ac5d5d7)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
…-guide redirector

## What changes were proposed in this pull request?

Update internal references from programming-guide to rdd-programming-guide

See apache/spark-website@5ddf243 and #18485 (comment)

Let's keep the redirector even if it's problematic to build, but not rely on it internally.

## How was this patch tested?

(Doc build)

Author: Sean Owen <sowen@cloudera.com>

Closes #18625 from srowen/SPARK-21267.2.

(cherry picked from commit 74ac1fb)
Signed-off-by: Sean Owen <sowen@cloudera.com>
## What changes were proposed in this pull request?

The current code is very verbose on shutdown.

The changes I propose is to change the log level when the driver is shutting down and the RPC connections are closed (RpcEnvStoppedException).

## How was this patch tested?

Tested with word count(deploy-mode = cluster, master = yarn, num-executors = 4) with 300GB of data.

Author: John Lee <jlee2@yahoo-inc.com>

Closes #18547 from yoonlee95/SPARK-21321.

(cherry picked from commit 0e07a29)
Signed-off-by: Tom Graves <tgraves@yahoo-inc.com>
…pressions

## What changes were proposed in this pull request?

This PR changes the direction of expression transformation in the DecimalPrecision rule. Previously, the expressions were transformed down, which led to incorrect result types when decimal expressions had other decimal expressions as their operands. The root cause of this issue was in visiting outer nodes before their children. Consider the example below:

```
    val inputSchema = StructType(StructField("col", DecimalType(26, 6)) :: Nil)
    val sc = spark.sparkContext
    val rdd = sc.parallelize(1 to 2).map(_ => Row(BigDecimal(12)))
    val df = spark.createDataFrame(rdd, inputSchema)

    // Works correctly since no nested decimal expression is involved
    // Expected result type: (26, 6) * (26, 6) = (38, 12)
    df.select($"col" * $"col").explain(true)
    df.select($"col" * $"col").printSchema()

    // Gives a wrong result since there is a nested decimal expression that should be visited first
    // Expected result type: ((26, 6) * (26, 6)) * (26, 6) = (38, 12) * (26, 6) = (38, 18)
    df.select($"col" * $"col" * $"col").explain(true)
    df.select($"col" * $"col" * $"col").printSchema()
```

The example above gives the following output:

```
// Correct result without sub-expressions
== Parsed Logical Plan ==
'Project [('col * 'col) AS (col * col)#4]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
(col * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- (col * col): decimal(38,12) (nullable = true)

// Incorrect result with sub-expressions
== Parsed Logical Plan ==
'Project [(('col * 'col) * 'col) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
((col * col) * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- ((col * col) * col): decimal(38,12) (nullable = true)
```

## How was this patch tested?

This PR was tested with available unit tests. Moreover, there are tests to cover previously failing scenarios.

Author: aokolnychyi <anton.okolnychyi@sap.com>

Closes #18583 from aokolnychyi/spark-21332.

(cherry picked from commit 0be5fb4)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?

Making those two classes will avoid Serialization issues like below:
```
Caused by: java.io.NotSerializableException: org.apache.spark.unsafe.types.UTF8String$IntWrapper
Serialization stack:
    - object not serializable (class: org.apache.spark.unsafe.types.UTF8String$IntWrapper, value: org.apache.spark.unsafe.types.UTF8String$IntWrapper326450e)
    - field (class: org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToInt$1, name: result$2, type: class org.apache.spark.unsafe.types.UTF8String$IntWrapper)
    - object (class org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToInt$1, <function1>)
```

## How was this patch tested?

- [x] Manual testing
- [ ] Unit test

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #18660 from brkyvz/serializableutf8.

(cherry picked from commit 26cd2ca)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ndle partition values with dot

## What changes were proposed in this pull request?

When we list partitions from hive metastore with a partial partition spec, we are expecting exact matching according to the partition values. However, hive treats dot specially and match any single character for dot. We should do an extra filter to drop unexpected partitions.

## How was this patch tested?

new regression test.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #18671 from cloud-fan/hive.

(cherry picked from commit f18b905)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?

In `SlidingWindowFunctionFrame`, it is now adding all rows to the buffer for which the input row value is equal to or less than the output row upper bound, then drop all rows from the buffer for which the input row value is smaller than the output row lower bound.
This could result in the buffer is very big though the window is small.
For example:
```
select a, b, sum(a)
over (partition by b order by a range between 1000000 following and 1000001 following)
from table
```
We can refine the logic and just add the qualified rows into buffer.

## How was this patch tested?
Manual test:
Run sql
`select shop, shopInfo, district, sum(revenue) over(partition by district order by revenue range between 100 following and 200 following) from revenueList limit 10`
against a table with 4  columns(shop: String, shopInfo: String, district: String, revenue: Int). The biggest partition is around 2G bytes, containing 200k lines.
Configure the executor with 2G bytes memory.
With the change in this pr, it works find. Without this change, below exception will be thrown.
```
MemoryError: Java heap space
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:504)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:62)
	at org.apache.spark.sql.execution.window.SlidingWindowFunctionFrame.write(WindowFunctionFrame.scala:201)
	at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:365)
	at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:289)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:341)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
```

Author: jinxing <jinxing6042@126.com>

Closes #18634 from jinxing64/SPARK-21414.

(cherry picked from commit 4eb081c)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…lures in some cases

## What changes were proposed in this pull request?

https://issues.apache.org/jira/projects/SPARK/issues/SPARK-21441

This issue can be reproduced by the following example:

```
val spark = SparkSession
   .builder()
   .appName("smj-codegen")
   .master("local")
   .config("spark.sql.autoBroadcastJoinThreshold", "1")
   .getOrCreate()
val df1 = spark.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("key", "int")
val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"), (3, "3"))).toDF("key", "str")
val df = df1.join(df2, df1("key") === df2("key"))
   .filter("int = 2 or reflect('java.lang.Integer', 'valueOf', str) = 1")
   .select("int")
   df.show()
```

To conclude, the issue happens when:
(1) SortMergeJoin condition contains CodegenFallback expressions.
(2) In PhysicalPlan tree, SortMergeJoin node  is the child of root node, e.g., the Project in above example.

This patch fixes the logic in `CollapseCodegenStages` rule.

## How was this patch tested?
Unit test and manual verification in our cluster.

Author: donnyzone <wellfengzhu@gmail.com>

Closes #18656 from DonnyZone/Fix_SortMergeJoinExec.

(cherry picked from commit 6b6dd68)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ime class

## What changes were proposed in this pull request?

Use of `ProcessingTime` class was deprecated in favor of `Trigger.ProcessingTime` in Spark 2.2. However interval uses to ProcessingTime causes deprecation warnings during compilation. This cannot be avoided entirely as even though it is deprecated as a public API, ProcessingTime instances are used internally in TriggerExecutor. This PR is to minimize the warning by removing its uses from tests as much as possible.

## How was this patch tested?
Existing tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #18678 from tdas/SPARK-21464.

(cherry picked from commit 70fe99d)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
## What changes were proposed in this pull request?
JIRA Issue: https://issues.apache.org/jira/browse/SPARK-21446
options.asConnectionProperties can not have fetchsize,because fetchsize belongs to Spark-only options, and Spark-only options have been excluded in connection properities.
So change properties of beforeFetch from  options.asConnectionProperties.asScala.toMap to options.asProperties.asScala.toMap

## How was this patch tested?

Author: DFFuture <albert.zhang23@gmail.com>

Closes #18665 from DFFuture/sparksql_pg.

(cherry picked from commit c972918)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
…#joinWith

## What changes were proposed in this pull request?

Two invalid join types were mistakenly listed in the javadoc for joinWith, in the Dataset class. I presume these were copied from the javadoc of join, but since joinWith returns a Dataset\<Tuple2\>, left_semi and left_anti are invalid, as they only return values from one of the datasets, instead of from both

## How was this patch tested?

I ran the following code :
```
public static void main(String[] args) {
	SparkSession spark = new SparkSession(new SparkContext("local[*]", "Test"));
	Dataset<Row> one = spark.createDataFrame(Arrays.asList(new Bean(1), new Bean(2), new Bean(3), new Bean(4), new Bean(5)), Bean.class);
	Dataset<Row> two = spark.createDataFrame(Arrays.asList(new Bean(4), new Bean(5), new Bean(6), new Bean(7), new Bean(8), new Bean(9)), Bean.class);

	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "inner").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "cross").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "outer").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "full").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "full_outer").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "left").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "left_outer").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "right").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "right_outer").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "left_semi").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "left_anti").show();} catch (Exception e) {e.printStackTrace();}
}
```
which tests all the different join types, and the last two (left_semi and left_anti) threw exceptions. The same code using join instead of joinWith did fine. The Bean class was just a java bean with a single int field, x.

Author: Corey Woodfield <coreywoodfield@gmail.com>

Closes #18462 from coreywoodfield/master.

(cherry picked from commit 8cd9cdf)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
For configurations with external shuffle enabled, we have observed that if a very large no. of blocks are being fetched from a remote host, it puts the NM under extra pressure and can crash it. This change introduces a configuration `spark.reducer.maxBlocksInFlightPerAddress` , to limit the no. of map outputs being fetched from a given remote address. The changes applied here are applicable for both the scenarios - when external shuffle is enabled as well as disabled.

Ran the job with the default configuration which does not change the existing behavior and ran it with few configurations of lower values -10,20,50,100. The job ran fine and there is no change in the output. (I will update the metrics related to NM in some time.)

Author: Dhruve Ashar <dhruveashargmail.com>

Closes #18487 from dhruve/impr/SPARK-21243.

Author: Dhruve Ashar <dhruveashar@gmail.com>

Closes #18691 from dhruve/branch-2.2.
Update the Quickstart and RDD programming guides to mention pip.

Built docs locally.

Author: Holden Karau <holden@us.ibm.com>

Closes #18698 from holdenk/SPARK-21434-add-pyspark-pip-documentation.

(cherry picked from commit cc00e99)
Signed-off-by: Holden Karau <holden@us.ibm.com>
Victsm and others added 29 commits November 30, 2017 19:24
… with Janino when compiling generated code.

## What changes were proposed in this pull request?

Bump up Janino dependency version to fix thread safety issue during compiling generated code

## How was this patch tested?

Check https://issues.apache.org/jira/browse/SPARK-22373 for details.
Converted part of the code in CodeGenerator into a standalone application, so the issue can be consistently reproduced locally.
Verified that changing Janino dependency version resolved this issue.

Author: Min Shen <mshen@linkedin.com>

Closes #19839 from Victsm/SPARK-22373.

(cherry picked from commit 7da1f57)
Signed-off-by: Sean Owen <sowen@cloudera.com>
https://issues.apache.org/jira/browse/SPARK-22653
executorRef.address can be null, pass the executorAddress which accounts for it being null a few lines above the fix.

Manually tested this patch. You can reproduce the issue by running a simple spark-shell in yarn client mode with dynamic allocation and request some executors up front. Let those executors idle timeout. Get a heap dump. Without this fix, you will see that addressToExecutorId still contains the ids, with the fix addressToExecutorId is properly cleaned up.

Author: Thomas Graves <tgraves@oath.com>

Closes #19850 from tgravescs/SPARK-22653.

(cherry picked from commit dc36542)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ding non existing nonlocal file path

## What changes were proposed in this pull request?
When user tries to load data with a non existing hdfs file path system is not validating it and the load command operation is getting successful.
This is misleading to the user. already there is a validation in the scenario of none existing local file path. This PR has added validation in the scenario of nonexisting hdfs file path
## How was this patch tested?
UT has been added for verifying the issue, also snapshots has been added after the verification in a spark yarn cluster

Author: sujith71955 <sujithchacko.2010@gmail.com>

Closes #19823 from sujith71955/master_LoadComand_Issue.

(cherry picked from commit 16adaf6)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
… containing special characters

## What changes were proposed in this pull request?

SPARK-22146 fix the FileNotFoundException issue only for the `inferSchema` method, ie. only for the schema inference, but it doesn't fix the problem when actually reading the data. Thus nearly the same exception happens when someone tries to use the data. This PR covers fixing the problem also there.

## How was this patch tested?

enhanced UT

Author: Marco Gaido <mgaido@hortonworks.com>

Closes #19844 from mgaido91/SPARK-22635.
…ent JobIDs in the RDD commit protocol

I have modified SparkHadoopMapReduceWriter so that executors and the driver always use consistent JobIds during the hadoop commit. Before SPARK-18191, spark always used the rddId, it just incorrectly named the variable stageId. After SPARK-18191, it used the rddId as the jobId on the driver's side, and the stageId as the jobId on the executors' side. With this change executors and the driver will consistently uses rddId as the jobId. Also with this change, during the hadoop commit protocol spark uses  actual stageId to check whether a stage can be committed unlike before that  it was using executors' jobId to do this check.
In addition to the existing unit tests, a test has been added to check whether executors and the driver are using the same JobId. The test failed before this change and passed after applying this fix.

Author: Reza Safi <rezasafi@cloudera.com>

Closes #19886 from rezasafi/stagerdd22.
…tion

## What changes were proposed in this pull request?

During [SPARK-22488](#19713) to fix view resolution issue, there occurs a regression at `2.2.1` and `master` branch like the following. This PR fixes that.

```scala
scala> spark.version
res2: String = 2.2.1

scala> sql("DROP TABLE IF EXISTS t").show
17/12/04 21:01:06 WARN DropTableCommand: org.apache.spark.sql.AnalysisException:
Table or view not found: t;
org.apache.spark.sql.AnalysisException: Table or view not found: t;
```

## How was this patch tested?

Manual.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #19888 from dongjoon-hyun/SPARK-22686.

(cherry picked from commit 82183f7)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This PR upgrade Janino version to 3.0.8. [Janino 3.0.8](https://janino-compiler.github.io/janino/changelog.html) includes an important fix to reduce the number of constant pool entries by using 'sipush' java bytecode.

* SIPUSH bytecode is not used for short integer constant [#33](janino-compiler/janino#33).

Please see detail in [this discussion thread](#19518 (comment)).

Existing tests

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #19890 from kiszk/SPARK-22688.

(cherry picked from commit 8ae004b)
Signed-off-by: Sean Owen <sowen@cloudera.com>
## What changes were proposed in this pull request?

Hotfix inadvertent change to xmlbuilder dep when updating Janino.
See backport of #19890

## How was this patch tested?

N/A

Author: Sean Owen <sowen@cloudera.com>

Closes #19922 from srowen/SPARK-22688.2.
…fficients bound)

## What changes were proposed in this pull request?
jira: https://issues.apache.org/jira/browse/SPARK-22289

add JSON encoding/decoding for Param[Matrix].

The issue was reported by Nic Eggert during saving LR model with LowerBoundsOnCoefficients.
There're two ways to resolve this as I see:
1. Support save/load on LogisticRegressionParams, and also adjust the save/load in LogisticRegression and LogisticRegressionModel.
2. Directly support Matrix in Param.jsonEncode, similar to what we have done for Vector.

After some discussion in jira, we prefer the fix to support Matrix as a valid Param type, for simplicity and convenience for other classes.

Note that in the implementation, I added a "class" field in the JSON object to match different JSON converters when loading, which is for preciseness and future extension.

## How was this patch tested?

new unit test to cover the LR case and JsonMatrixConverter

Author: Yuhao Yang <yuhao.yang@intel.com>

Closes #19525 from hhbyyh/lrsave.

(cherry picked from commit 10c27a6)
Signed-off-by: Yanbo Liang <ybliang8@gmail.com>
## What changes were proposed in this pull request?

It solves the problem when submitting a wrong CreateSubmissionRequest to Spark Dispatcher was causing a bad state of Dispatcher and making it inactive as a Mesos framework.

https://issues.apache.org/jira/browse/SPARK-22574

## How was this patch tested?

All spark test passed successfully.

It was tested sending a wrong request (without appArgs) before and after the change. The point is easy, check if the value is null before being accessed.

This was before the change, leaving the dispatcher inactive:

```
Exception in thread "Thread-22" java.lang.NullPointerException
	at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.getDriverCommandValue(MesosClusterScheduler.scala:444)
	at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.buildDriverCommand(MesosClusterScheduler.scala:451)
	at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.org$apache$spark$scheduler$cluster$mesos$MesosClusterScheduler$$createTaskInfo(MesosClusterScheduler.scala:538)
	at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler$$anonfun$scheduleTasks$1.apply(MesosClusterScheduler.scala:570)
	at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler$$anonfun$scheduleTasks$1.apply(MesosClusterScheduler.scala:555)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.scheduleTasks(MesosClusterScheduler.scala:555)
	at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.resourceOffers(MesosClusterScheduler.scala:621)
```

And after:

```
  "message" : "Malformed request: org.apache.spark.deploy.rest.SubmitRestProtocolException: Validation of message CreateSubmissionRequest failed!\n\torg.apache.spark.deploy.rest.SubmitRestProtocolMessage.validate(SubmitRestProtocolMessage.scala:70)\n\torg.apache.spark.deploy.rest.SubmitRequestServlet.doPost(RestSubmissionServer.scala:272)\n\tjavax.servlet.http.HttpServlet.service(HttpServlet.java:707)\n\tjavax.servlet.http.HttpServlet.service(HttpServlet.java:790)\n\torg.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:845)\n\torg.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583)\n\torg.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)\n\torg.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511)\n\torg.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)\n\torg.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)\n\torg.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)\n\torg.spark_project.jetty.server.Server.handle(Server.java:524)\n\torg.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:319)\n\torg.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:253)\n\torg.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273)\n\torg.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:95)\n\torg.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)\n\torg.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)\n\torg.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)\n\torg.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)\n\torg.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)\n\torg.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)\n\tjava.lang.Thread.run(Thread.java:745)"
```

Author: German Schiavon <germanschiavon@gmail.com>

Closes #19793 from Gschiavon/fix-submission-request.

(cherry picked from commit 7a51e71)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?

PR closed with all the comments -> #19793

It solves the problem when submitting a wrong CreateSubmissionRequest to Spark Dispatcher was causing a bad state of Dispatcher and making it inactive as a Mesos framework.

https://issues.apache.org/jira/browse/SPARK-22574

## How was this patch tested?

All spark test passed successfully.

It was tested sending a wrong request (without appArgs) before and after the change. The point is easy, check if the value is null before being accessed.

This was before the change, leaving the dispatcher inactive:

```
Exception in thread "Thread-22" java.lang.NullPointerException
	at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.getDriverCommandValue(MesosClusterScheduler.scala:444)
	at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.buildDriverCommand(MesosClusterScheduler.scala:451)
	at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.org$apache$spark$scheduler$cluster$mesos$MesosClusterScheduler$$createTaskInfo(MesosClusterScheduler.scala:538)
	at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler$$anonfun$scheduleTasks$1.apply(MesosClusterScheduler.scala:570)
	at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler$$anonfun$scheduleTasks$1.apply(MesosClusterScheduler.scala:555)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.scheduleTasks(MesosClusterScheduler.scala:555)
	at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.resourceOffers(MesosClusterScheduler.scala:621)
```

And after:

```
  "message" : "Malformed request: org.apache.spark.deploy.rest.SubmitRestProtocolException: Validation of message CreateSubmissionRequest failed!\n\torg.apache.spark.deploy.rest.SubmitRestProtocolMessage.validate(SubmitRestProtocolMessage.scala:70)\n\torg.apache.spark.deploy.rest.SubmitRequestServlet.doPost(RestSubmissionServer.scala:272)\n\tjavax.servlet.http.HttpServlet.service(HttpServlet.java:707)\n\tjavax.servlet.http.HttpServlet.service(HttpServlet.java:790)\n\torg.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:845)\n\torg.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583)\n\torg.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)\n\torg.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511)\n\torg.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)\n\torg.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)\n\torg.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)\n\torg.spark_project.jetty.server.Server.handle(Server.java:524)\n\torg.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:319)\n\torg.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:253)\n\torg.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273)\n\torg.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:95)\n\torg.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)\n\torg.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)\n\torg.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)\n\torg.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)\n\torg.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)\n\torg.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)\n\tjava.lang.Thread.run(Thread.java:745)"
```

Author: German Schiavon <germanschiavon@gmail.com>

Closes #19966 from Gschiavon/fix-submission-request.

(cherry picked from commit 0bdb4e5)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?

`testthat` 2.0.0 is released and AppVeyor now started to use it instead of 1.0.2. And then, we started to have R tests failed in AppVeyor. See - https://ci.appveyor.com/project/ApacheSoftwareFoundation/spark/build/1967-master

```
Error in get(name, envir = asNamespace(pkg), inherits = FALSE) :
  object 'run_tests' not found
Calls: ::: -> get
```

This seems because we rely on internal `testthat:::run_tests` here:

https://github.com/r-lib/testthat/blob/v1.0.2/R/test-package.R#L62-L75

https://github.com/apache/spark/blob/dc4c351837879dab26ad8fb471dc51c06832a9e4/R/pkg/tests/run-all.R#L49-L52

However, seems it was removed out from 2.0.0.  I tried few other exposed APIs like `test_dir` but I failed to make a good compatible fix.

Seems we better fix the `testthat` version first to make the build passed.

## How was this patch tested?

Manually tested and AppVeyor tests.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20003 from HyukjinKwon/SPARK-22817.

(cherry picked from commit c2aeddf)
Signed-off-by: hyukjinkwon <gurwls223@gmail.com>
…oder

This behavior has confused some users, so lets clarify it.

Author: Michael Armbrust <michael@databricks.com>

Closes #20048 from marmbrus/datasetAsDocs.

(cherry picked from commit 8df1da3)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?

Create table using the right DataFrame. peopleDF->usersDF

peopleDF:
+----+-------+
| age|   name|
+----+-------+
usersDF:
+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+

## How was this patch tested?

Manually tested.

Author: CNRui <13266776177@163.com>

Closes #20052 from CNRui/patch-2.

(cherry picked from commit ea2642e)
Signed-off-by: Sean Owen <sowen@cloudera.com>
## What changes were proposed in this pull request?

Since all CRAN checks go through the same machine, if there is an older partial download or partial install of Spark left behind the tests fail. This PR overwrites the install files when running tests. This shouldn't affect Jenkins as `SPARK_HOME` is set when running Jenkins tests.

## How was this patch tested?

Test manually by running `R CMD check --as-cran`

Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu>

Closes #20060 from shivaram/sparkr-overwrite-cran.

(cherry picked from commit 1219d7a)
Signed-off-by: Felix Cheung <felixcheung@apache.org>
…ng expressions

## What changes were proposed in this pull request?

The following SQL query should return zero rows, but in Spark it actually returns one row:

```
SELECT 1 from (
  SELECT 1 AS z,
  MIN(a.x)
  FROM (select 1 as x) a
  WHERE false
) b
where b.z != b.z
```

The problem stems from the `PushDownPredicate` rule: when this rule encounters a filter on top of an Aggregate operator, e.g. `Filter(Agg(...))`, it removes the original filter and adds a new filter onto Aggregate's child, e.g. `Agg(Filter(...))`. This is sometimes okay, but the case above is a counterexample: because there is no explicit `GROUP BY`, we are implicitly computing a global aggregate over the entire table so the original filter was not acting like a `HAVING` clause filtering the number of groups: if we push this filter then it fails to actually reduce the cardinality of the Aggregate output, leading to the wrong answer.

In 2016 I fixed a similar problem involving invalid pushdowns of data-independent filters (filters which reference no columns of the filtered relation). There was additional discussion after my fix was merged which pointed out that my patch was an incomplete fix (see #15289), but it looks I must have either misunderstood the comment or forgot to follow up on the additional points raised there.

This patch fixes the problem by choosing to never push down filters in cases where there are no grouping expressions. Since there are no grouping keys, the only columns are aggregate columns and we can't push filters defined over aggregate results, so this change won't cause us to miss out on any legitimate pushdown opportunities.

## How was this patch tested?

New regression tests in `SQLQueryTestSuite` and `FilterPushdownSuite`.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #20180 from JoshRosen/SPARK-22983-dont-push-filters-beneath-aggs-with-empty-grouping-expressions.

(cherry picked from commit 2c73d2a)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
…enerateUnsafeRowJoiner

## What changes were proposed in this pull request?

This PR fixes a longstanding correctness bug in `GenerateUnsafeRowJoiner`. This class was introduced in #7821 (July 2015 / Spark 1.5.0+) and is used to combine pairs of UnsafeRows in TungstenAggregationIterator, CartesianProductExec, and AppendColumns.

### Bugs fixed by this patch

1. **Incorrect combining of null-tracking bitmaps**: when concatenating two UnsafeRows, the implementation "Concatenate the two bitsets together into a single one, taking padding into account". If one row has no columns then it has a bitset size of 0, but the code was incorrectly assuming that if the left row had a non-zero number of fields then the right row would also have at least one field, so it was copying invalid bytes and and treating them as part of the bitset. I'm not sure whether this bug was also present in the original implementation or whether it was introduced in #7892 (which fixed another bug in this code).
2. **Incorrect updating of data offsets for null variable-length fields**: after updating the bitsets and copying fixed-length and variable-length data, we need to perform adjustments to the offsets pointing the start of variable length fields's data. The existing code was _conditionally_ adding a fixed offset to correct for the new length of the combined row, but it is unsafe to do this if the variable-length field has a null value: we always represent nulls by storing `0` in the fixed-length slot, but this code was incorrectly incrementing those values. This bug was present since the original version of `GenerateUnsafeRowJoiner`.

### Why this bug remained latent for so long

The PR which introduced `GenerateUnsafeRowJoiner` features several randomized tests, including tests of the cases where one side of the join has no fields and where string-valued fields are null. However, the existing assertions were too weak to uncover this bug:

- If a null field has a non-zero value in its fixed-length data slot then this will not cause problems for field accesses because the null-tracking bitmap should still be correct and we will not try to use the incorrect offset for anything.
- If the null tracking bitmap is corrupted by joining against a row with no fields then the corruption occurs in field numbers past the actual field numbers contained in the row. Thus valid `isNullAt()` calls will not read the incorrectly-set bits.

The existing `GenerateUnsafeRowJoinerSuite` tests only exercised `.get()` and `isNullAt()`, but didn't actually check the UnsafeRows for bit-for-bit equality, preventing these bugs from failing assertions. It turns out that there was even a [GenerateUnsafeRowJoinerBitsetSuite](https://github.com/apache/spark/blob/03377d2522776267a07b7d6ae9bddf79a4e0f516/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerBitsetSuite.scala) but it looks like it also didn't catch this problem because it only tested the bitsets in an end-to-end fashion by accessing them through the `UnsafeRow` interface instead of actually comparing the bitsets' bytes.

### Impact of these bugs

- This bug will cause `equals()` and `hashCode()` to be incorrect for these rows, which will be problematic in case`GenerateUnsafeRowJoiner`'s results are used as join or grouping keys.
- Chained / repeated invocations of `GenerateUnsafeRowJoiner` may result in reads from invalid null bitmap positions causing fields to incorrectly become NULL (see the end-to-end example below).
  - It looks like this generally only happens in `CartesianProductExec`, which our query optimizer often avoids executing (usually we try to plan a `BroadcastNestedLoopJoin` instead).

### End-to-end test case demonstrating the problem

The following query demonstrates how this bug may result in incorrect query results:

```sql
set spark.sql.autoBroadcastJoinThreshold=-1; -- Needed to trigger CartesianProductExec

create table a as select * from values 1;
create table b as select * from values 2;

SELECT
  t3.col1,
  t1.col1
FROM a t1
CROSS JOIN b t2
CROSS JOIN b t3
```

This should return `(2, 1)` but instead was returning `(null, 1)`.

Column pruning ends up trimming off all columns from `t2`, so when `t2` joins with another table this triggers the bitmap-copying bug. This incorrect bitmap is subsequently copied again when performing the final join, causing the final output to have an incorrectly-set null bit for the first field.

## How was this patch tested?

Strengthened the assertions in existing tests in GenerateUnsafeRowJoinerSuite. Also verified that the end-to-end test case which uncovered this now passes.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #20181 from JoshRosen/SPARK-22984-fix-generate-unsaferow-joiner-bitmap-bugs.

(cherry picked from commit f20131d)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…provider org.apache.spark.sql.hive.orc

## What changes were proposed in this pull request?

Fix the warning: Couldn't find corresponding Hive SerDe for data source provider org.apache.spark.sql.hive.orc.
This PR is for branch-2.2 and cherry-pick from 8032cf8

The old PR is #20165

## How was this patch tested?

 Please see test("SPARK-22972: hive orc source")

Author: xubo245 <601450868@qq.com>

Closes #20195 from xubo245/HiveSerDeForBranch2.2.
… NULL description

## What changes were proposed in this pull request?
When users' DB description is NULL, users might hit `NullPointerException`. This PR is to fix the issue.

## How was this patch tested?
Added test cases

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20215 from gatorsmile/SPARK-23001.

(cherry picked from commit 87c98de)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…adChannel

## What changes were proposed in this pull request?

This patch fixes a severe asynchronous IO bug in Spark's Netty-based file transfer code. At a high-level, the problem is that an unsafe asynchronous `close()` of a pipe's source channel creates a race condition where file transfer code closes a file descriptor then attempts to read from it. If the closed file descriptor's number has been reused by an `open()` call then this invalid read may cause unrelated file operations to return incorrect results. **One manifestation of this problem is incorrect query results.**

For a high-level overview of how file download works, take a look at the control flow in `NettyRpcEnv.openChannel()`: this code creates a pipe to buffer results, then submits an asynchronous stream request to a lower-level TransportClient. The callback passes received data to the sink end of the pipe. The source end of the pipe is passed back to the caller of `openChannel()`. Thus `openChannel()` returns immediately and callers interact with the returned pipe source channel.

Because the underlying stream request is asynchronous, errors may occur after `openChannel()` has returned and after that method's caller has started to `read()` from the returned channel. For example, if a client requests an invalid stream from a remote server then the "stream does not exist" error may not be received from the remote server until after `openChannel()` has returned. In order to be able to propagate the "stream does not exist" error to the file-fetching application thread, this code wraps the pipe's source channel in a special `FileDownloadChannel` which adds an `setError(t: Throwable)` method, then calls this `setError()` method in the FileDownloadCallback's `onFailure` method.

It is possible for `FileDownloadChannel`'s `read()` and `setError()` methods to be called concurrently from different threads: the `setError()` method is called from within the Netty RPC system's stream callback handlers, while the `read()` methods are called from higher-level application code performing remote stream reads.

The problem lies in `setError()`: the existing code closed the wrapped pipe source channel. Because `read()` and `setError()` occur in different threads, this means it is possible for one thread to be calling `source.read()` while another asynchronously calls `source.close()`. Java's IO libraries do not guarantee that this will be safe and, in fact, it's possible for these operations to interleave in such a way that a lower-level `read()` system call occurs right after a `close()` call. In the best-case, this fails as a read of a closed file descriptor; in the worst-case, the file descriptor number has been re-used by an intervening `open()` operation and the read corrupts the result of an unrelated file IO operation being performed by a different thread.

The solution here is to remove the `stream.close()` call in `onError()`: the thread that is performing the `read()` calls is responsible for closing the stream in a `finally` block, so there's no need to close it here. If that thread is blocked in a `read()` then it will become unblocked when the sink end of the pipe is closed in `FileDownloadCallback.onFailure()`.

After making this change, we also need to refine the `read()` method to always check for a `setError()` result, even if the underlying channel `read()` call has succeeded.

This patch also makes a slight cleanup to a dodgy-looking `catch e: Exception` block to use a safer `try-finally` error handling idiom.

This bug was introduced in SPARK-11956 / #9941 and is present in Spark 1.6.0+.

## How was this patch tested?

This fix was tested manually against a workload which non-deterministically hit this bug.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #20179 from JoshRosen/SPARK-22982-fix-unsafe-async-io-in-file-download-channel.

(cherry picked from commit edf0a48)
Signed-off-by: Sean Owen <sowen@cloudera.com>
…re was no progress reported

## What changes were proposed in this pull request?

`MetricsReporter ` assumes that there has been some progress for the query, ie. `lastProgress` is not null. If this is not true, as it might happen in particular conditions, a `NullPointerException` can be thrown.

The PR checks whether there is a `lastProgress` and if this is not true, it returns a default value for the metrics.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20189 from mgaido91/SPARK-22975.

(cherry picked from commit 5427739)
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
## What changes were proposed in this pull request?

This PR aims to update the followings in `docker/spark-test`.

- JDK7 -> JDK8
Spark 2.2+ supports JDK8 only.

- Ubuntu 12.04.5 LTS(precise) -> Ubuntu 16.04.3 LTS(xeniel)
The end of life of `precise` was April 28, 2017.

## How was this patch tested?

Manual.

* Master
```
$ cd external/docker
$ ./build
$ export SPARK_HOME=...
$ docker run -v $SPARK_HOME:/opt/spark spark-test-master
CONTAINER_IP=172.17.0.3
...
18/01/11 06:50:25 INFO MasterWebUI: Bound MasterWebUI to 172.17.0.3, and started at http://172.17.0.3:8080
18/01/11 06:50:25 INFO Utils: Successfully started service on port 6066.
18/01/11 06:50:25 INFO StandaloneRestServer: Started REST server for submitting applications on port 6066
18/01/11 06:50:25 INFO Master: I have been elected leader! New state: ALIVE
```

* Slave
```
$ docker run -v $SPARK_HOME:/opt/spark spark-test-worker spark://172.17.0.3:7077
CONTAINER_IP=172.17.0.4
...
18/01/11 06:51:54 INFO Worker: Successfully registered with master spark://172.17.0.3:7077
```

After slave starts, master will show
```
18/01/11 06:51:54 INFO Master: Registering worker 172.17.0.4:8888 with 4 cores, 1024.0 MB RAM
```

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #20230 from dongjoon-hyun/SPARK-23038.

(cherry picked from commit 7a3d0aa)
Signed-off-by: Felix Cheung <felixcheung@apache.org>
…til.NoSuchElementException

## What changes were proposed in this pull request?
The following SQL involving scalar correlated query returns a map exception.
``` SQL
SELECT t1a
FROM   t1
WHERE  t1a = (SELECT   count(*)
              FROM     t2
              WHERE    t2c = t1c
              HAVING   count(*) >= 1)
```
``` SQL
key not found: ExprId(278,786682bb-41f9-4bd5-a397-928272cc8e4e)
java.util.NoSuchElementException: key not found: ExprId(278,786682bb-41f9-4bd5-a397-928272cc8e4e)
        at scala.collection.MapLike$class.default(MapLike.scala:228)
        at scala.collection.AbstractMap.default(Map.scala:59)
        at scala.collection.MapLike$class.apply(MapLike.scala:141)
        at scala.collection.AbstractMap.apply(Map.scala:59)
        at org.apache.spark.sql.catalyst.optimizer.RewriteCorrelatedScalarSubquery$.org$apache$spark$sql$catalyst$optimizer$RewriteCorrelatedScalarSubquery$$evalSubqueryOnZeroTups(subquery.scala:378)
        at org.apache.spark.sql.catalyst.optimizer.RewriteCorrelatedScalarSubquery$$anonfun$org$apache$spark$sql$catalyst$optimizer$RewriteCorrelatedScalarSubquery$$constructLeftJoins$1.apply(subquery.scala:430)
        at org.apache.spark.sql.catalyst.optimizer.RewriteCorrelatedScalarSubquery$$anonfun$org$apache$spark$sql$catalyst$optimizer$RewriteCorrelatedScalarSubquery$$constructLeftJoins$1.apply(subquery.scala:426)
```

In this case, after evaluating the HAVING clause "count(*) > 1" statically
against the binding of aggregtation result on empty input, we determine
that this query will not have a the count bug. We should simply return
the evalSubqueryOnZeroTups with empty value.
(Please fill in changes proposed in this fix)

## How was this patch tested?
A new test was added in the Subquery bucket.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #20283 from dilipbiswal/scalar-count-defect.

(cherry picked from commit 0c2ba42)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
…integration document

## What changes were proposed in this pull request?

In latest structured-streaming-kafka-integration document, Java code example for Kafka integration is using `DataFrame<Row>`, shouldn't it be changed to `DataSet<Row>`?

## How was this patch tested?

manual test has been performed to test the updated example Java code in Spark 2.2.1 with Kafka 1.0

Author: brandonJY <brandonJY@users.noreply.github.com>

Closes #20312 from brandonJY/patch-2.

(cherry picked from commit 6121e91)
Signed-off-by: Sean Owen <sowen@cloudera.com>
…omposite order by clause refers to both original columns and aliases

## What changes were proposed in this pull request?
Here is the test snippet.
``` SQL
scala> Seq[(Integer, Integer)](
     |         (1, 1),
     |         (1, 3),
     |         (2, 3),
     |         (3, 3),
     |         (4, null),
     |         (5, null)
     |       ).toDF("key", "value").createOrReplaceTempView("src")

scala> sql(
     |         """
     |           |SELECT MAX(value) as value, key as col2
     |           |FROM src
     |           |GROUP BY key
     |           |ORDER BY value desc, key
     |         """.stripMargin).show
+-----+----+
|value|col2|
+-----+----+
|    3|   3|
|    3|   2|
|    3|   1|
| null|   5|
| null|   4|
+-----+----+
```SQL
Here is the explain output :

```SQL
== Parsed Logical Plan ==
'Sort ['value DESC NULLS LAST, 'key ASC NULLS FIRST], true
+- 'Aggregate ['key], ['MAX('value) AS value#9, 'key AS col2#10]
   +- 'UnresolvedRelation `src`

== Analyzed Logical Plan ==
value: int, col2: int
Project [value#9, col2#10]
+- Sort [value#9 DESC NULLS LAST, col2#10 DESC NULLS LAST], true
   +- Aggregate [key#5], [max(value#6) AS value#9, key#5 AS col2#10]
      +- SubqueryAlias src
         +- Project [_1#2 AS key#5, _2#3 AS value#6]
            +- LocalRelation [_1#2, _2#3]
``` SQL
The sort direction is being wrongly changed from ASC to DSC while resolving ```Sort``` in
resolveAggregateFunctions.

The above testcase models TPCDS-Q71 and thus we have the same issue in Q71 as well.

## How was this patch tested?
A few tests are added in SQLQuerySuite.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #20453 from dilipbiswal/local_spark.
…8, it will result in an error result

## What changes were proposed in this pull request?
In the `checkIndexAndDataFile`,the `blocks` is the ` Int` type,  when it is greater than 2^28, `blocks*8` will overflow, and this will result in an error result.
In fact, `blocks` is actually the number of partitions.

## How was this patch tested?
Manual test

Author: liuxian <liu.xian3@zte.com.cn>

Closes #20544 from 10110346/overflow.

(cherry picked from commit f77270b)
Signed-off-by: Sean Owen <sowen@cloudera.com>
…loading JDBC Drivers

## What changes were proposed in this pull request?

Since some JDBC Drivers have class initialization code to call `DriverManager`, we need to initialize `DriverManager` first in order to avoid potential executor-side **deadlock** situations like the following (or [STORM-2527](https://issues.apache.org/jira/browse/STORM-2527)).

```
Thread 9587: (state = BLOCKED)
 - sun.reflect.NativeConstructorAccessorImpl.newInstance0(java.lang.reflect.Constructor, java.lang.Object[]) bci=0 (Compiled frame; information may be imprecise)
 - sun.reflect.NativeConstructorAccessorImpl.newInstance(java.lang.Object[]) bci=85, line=62 (Compiled frame)
 - sun.reflect.DelegatingConstructorAccessorImpl.newInstance(java.lang.Object[]) bci=5, line=45 (Compiled frame)
 - java.lang.reflect.Constructor.newInstance(java.lang.Object[]) bci=79, line=423 (Compiled frame)
 - java.lang.Class.newInstance() bci=138, line=442 (Compiled frame)
 - java.util.ServiceLoader$LazyIterator.nextService() bci=119, line=380 (Interpreted frame)
 - java.util.ServiceLoader$LazyIterator.next() bci=11, line=404 (Interpreted frame)
 - java.util.ServiceLoader$1.next() bci=37, line=480 (Interpreted frame)
 - java.sql.DriverManager$2.run() bci=21, line=603 (Interpreted frame)
 - java.sql.DriverManager$2.run() bci=1, line=583 (Interpreted frame)
 - java.security.AccessController.doPrivileged(java.security.PrivilegedAction) bci=0 (Compiled frame)
 - java.sql.DriverManager.loadInitialDrivers() bci=27, line=583 (Interpreted frame)
 - java.sql.DriverManager.<clinit>() bci=32, line=101 (Interpreted frame)
 - org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(java.lang.String, java.lang.Integer, java.lang.String, java.util.Properties) bci=12, line=98 (Interpreted frame)
 - org.apache.phoenix.mapreduce.util.ConnectionUtil.getInputConnection(org.apache.hadoop.conf.Configuration, java.util.Properties) bci=22, line=57 (Interpreted frame)
 - org.apache.phoenix.mapreduce.PhoenixInputFormat.getQueryPlan(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.conf.Configuration) bci=61, line=116 (Interpreted frame)
 - org.apache.phoenix.mapreduce.PhoenixInputFormat.createRecordReader(org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext) bci=10, line=71 (Interpreted frame)
 - org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(org.apache.spark.rdd.NewHadoopRDD, org.apache.spark.Partition, org.apache.spark.TaskContext) bci=233, line=156 (Interpreted frame)

Thread 9170: (state = BLOCKED)
 - org.apache.phoenix.jdbc.PhoenixDriver.<clinit>() bci=35, line=125 (Interpreted frame)
 - sun.reflect.NativeConstructorAccessorImpl.newInstance0(java.lang.reflect.Constructor, java.lang.Object[]) bci=0 (Compiled frame)
 - sun.reflect.NativeConstructorAccessorImpl.newInstance(java.lang.Object[]) bci=85, line=62 (Compiled frame)
 - sun.reflect.DelegatingConstructorAccessorImpl.newInstance(java.lang.Object[]) bci=5, line=45 (Compiled frame)
 - java.lang.reflect.Constructor.newInstance(java.lang.Object[]) bci=79, line=423 (Compiled frame)
 - java.lang.Class.newInstance() bci=138, line=442 (Compiled frame)
 - org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(java.lang.String) bci=89, line=46 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply() bci=7, line=53 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply() bci=1, line=52 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon$1.<init>(org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD, org.apache.spark.Partition, org.apache.spark.TaskContext) bci=81, line=347 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) bci=7, line=339 (Interpreted frame)
```

## How was this patch tested?

N/A

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #20563 from dongjoon-hyun/SPARK-23186-2.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment