Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify GraphImpl RDDs + other graph load optimizations #497

Closed
wants to merge 17 commits into from

Conversation

ankurdave
Copy link
Contributor

This PR makes the following changes, primarily in e4fbd32:

  1. Unify RDDs to avoid zipPartitions. A graph used to be four RDDs: vertices, edges, routing table, and triplet view. This commit merges them down to two: vertices (with routing table), and edges (with replicated vertices).
  2. Avoid duplicate shuffle in graph building. We used to do two shuffles when building a graph: one to extract routing information from the edges and move it to the vertices, and another to find nonexistent vertices referred to by edges. With this commit, the latter is done as a side effect of the former.
  3. Avoid no-op shuffle when joins are fully eliminated. This is a side effect of unifying the edges and the triplet view.
  4. Join elimination for mapTriplets.
  5. Ship only the needed vertex attributes when upgrading the triplet view. If the triplet view already contains source attributes, and we now need both attributes, only ship destination attributes rather than re-shipping both. This is done in ReplicatedVertexView#upgrade.

This commit makes the following changes:

1. *Unify RDDs to avoid zipPartitions.* A graph used to be four RDDs:
vertices, edges, routing table, and triplet view. This commit merges
them down to two: vertices (with routing table), and edges (with
replicated vertices).

2. *Avoid duplicate shuffle in graph building.* We used to do two shuffles
when building a graph: one to extract routing information from the edges
and move it to the vertices, and another to find nonexistent vertices
referred to by edges. With this commit, the latter is done as a side
effect of the former.

3. *Avoid no-op shuffle when joins are fully eliminated.* This is a side
effect of unifying the edges and the triplet view.

4. *Join elimination for mapTriplets.*

5. *Ship only the needed vertex attributes when upgrading the
triplet view.* If the triplet view already contains source attributes,
and we now need both attributes, only ship destination attributes rather
than re-shipping both. This is done in `ReplicatedVertexView#upgrade`.
@ankurdave
Copy link
Contributor Author

cc @rxin @jegonzal

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14355/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14356/

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14358/

@mateiz
Copy link
Contributor

mateiz commented May 6, 2014

Jenkins, retest this please

@AmplabJenkins
Copy link

Build triggered.

@AmplabJenkins
Copy link

Build started.

@@ -63,4 +63,6 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }

override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()

def toTuple = ((srcId, srcAttr), (dstId, dstAttr), attr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an explicit return type to this

@mateiz
Copy link
Contributor

mateiz commented May 10, 2014

Jenkins, test this please

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14873/

Also update RoutingTableMessageSerializer to pass ClassTags.
@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14874/

@pwendell
Copy link
Contributor

Thanks everyone - I'm going to pull this in.

@asfgit asfgit closed this in 905173d May 10, 2014
asfgit pushed a commit that referenced this pull request May 10, 2014
This PR makes the following changes, primarily in e4fbd32:

1. *Unify RDDs to avoid zipPartitions.* A graph used to be four RDDs: vertices, edges, routing table, and triplet view. This commit merges them down to two: vertices (with routing table), and edges (with replicated vertices).

2. *Avoid duplicate shuffle in graph building.* We used to do two shuffles when building a graph: one to extract routing information from the edges and move it to the vertices, and another to find nonexistent vertices referred to by edges. With this commit, the latter is done as a side effect of the former.

3. *Avoid no-op shuffle when joins are fully eliminated.* This is a side effect of unifying the edges and the triplet view.

4. *Join elimination for mapTriplets.*

5. *Ship only the needed vertex attributes when upgrading the triplet view.* If the triplet view already contains source attributes, and we now need both attributes, only ship destination attributes rather than re-shipping both. This is done in `ReplicatedVertexView#upgrade`.

Author: Ankur Dave <ankurdave@gmail.com>

Closes #497 from ankurdave/unify-rdds and squashes the following commits:

332ab43 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into unify-rdds
4933e2e [Ankur Dave] Exclude RoutingTable from binary compatibility check
5ba8789 [Ankur Dave] Add GraphX upgrade guide from Spark 0.9.1
13ac845 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into unify-rdds
a04765c [Ankur Dave] Remove unnecessary toOps call
57202e8 [Ankur Dave] Replace case with pair parameter
75af062 [Ankur Dave] Add explicit return types
04d3ae5 [Ankur Dave] Convert implicit parameter to context bound
c88b269 [Ankur Dave] Revert upgradeIterator to if-in-a-loop
0d3584c [Ankur Dave] EdgePartition.size should be val
2a928b2 [Ankur Dave] Set locality wait
10b3596 [Ankur Dave] Clean up public API
ae36110 [Ankur Dave] Fix style errors
e4fbd32 [Ankur Dave] Unify GraphImpl RDDs + other graph load optimizations
d6d60e2 [Ankur Dave] In GraphLoader, coalesce to minEdgePartitions
62c7b78 [Ankur Dave] In Analytics, take PageRank numIter
d64e8d4 [Ankur Dave] Log current Pregel iteration
(cherry picked from commit 905173d)

Signed-off-by: Patrick Wendell <pwendell@gmail.com>
pwendell pushed a commit to pwendell/spark that referenced this pull request May 12, 2014
Updated Spark Streaming Programming Guide

Here is the updated version of the Spark Streaming Programming Guide. This is still a work in progress, but the major changes are in place. So feedback is most welcome.

In general, I have tried to make the guide to easier to understand even if the reader does not know much about Spark. The updated website is hosted here -

http://www.eecs.berkeley.edu/~tdas/spark_docs/streaming-programming-guide.html

The major changes are:
- Overview illustrates the usecases of Spark Streaming - various input sources and various output sources
- An example right after overview to quickly give an idea of what Spark Streaming program looks like
- Made Java API and examples a first class citizen like Scala by using tabs to show both Scala and Java examples (similar to AMPCamp tutorial's code tabs)
- Highlighted the DStream operations updateStateByKey and transform because of their powerful nature
- Updated driver node failure recovery text to highlight automatic recovery in Spark standalone mode
- Added information about linking and using the external input sources like Kafka and Flume
- In general, reorganized the sections to better show the Basic section and the more advanced sections like Tuning and Recovery.

Todos:
- Links to the docs of external Kafka, Flume, etc
- Illustrate window operation with figure as well as example.

Author: Tathagata Das <tathagata.das1565@gmail.com>

== Merge branch commits ==

commit 18ff10556570b39d672beeb0a32075215cfcc944
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Tue Jan 28 21:49:30 2014 -0800

    Fixed a lot of broken links.

commit 34a5a6008dac2e107624c7ff0db0824ee5bae45f
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Tue Jan 28 18:02:28 2014 -0800

    Updated github url to use SPARK_GITHUB_URL variable.

commit f338a60ae8069e0a382d2cb170227e5757cc0b7a
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Mon Jan 27 22:42:42 2014 -0800

    More updates based on Patrick and Harvey's comments.

commit 89a81ff25726bf6d26163e0dd938290a79582c0f
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Mon Jan 27 13:08:34 2014 -0800

    Updated docs based on Patricks PR comments.

commit d5b6196b532b5746e019b959a79ea0cc013a8fc3
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Sun Jan 26 20:15:58 2014 -0800

    Added spark.streaming.unpersist config and info on StreamingListener interface.

commit e3dcb46ab83d7071f611d9b5008ba6bc16c9f951
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Sun Jan 26 18:41:12 2014 -0800

    Fixed docs on StreamingContext.getOrCreate.

commit 6c29524639463f11eec721e4d17a9d7159f2944b
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Thu Jan 23 18:49:39 2014 -0800

    Added example and figure for window operations, and links to Kafka and Flume API docs.

commit f06b964a51bb3b21cde2ff8bdea7d9785f6ce3a9
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Wed Jan 22 22:49:12 2014 -0800

    Fixed missing endhighlight tag in the MLlib guide.

commit 036a7d46187ea3f2a0fb8349ef78f10d6c0b43a9
Merge: eab351d a1cd185
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Wed Jan 22 22:17:42 2014 -0800

    Merge remote-tracking branch 'apache/master' into docs-update

commit eab351d05c0baef1d4b549e1581310087158d78d
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Wed Jan 22 22:17:15 2014 -0800

    Update Spark Streaming Programming Guide.
asfgit pushed a commit that referenced this pull request Jun 6, 2014
Due to a bug introduced by #497, Pregel does not unpersist replicated vertices from previous iterations. As a result, they stay cached until memory is full, wasting GC time.

This PR corrects the problem by unpersisting both the edges and the replicated vertices of previous iterations. This is safe because the edges and replicated vertices of the current iteration are cached by the call to `g.cache()` and then materialized by the call to `messages.count()`. Therefore no unmaterialized RDDs depend on `prevG.edges`. I verified that no recomputation occurs by running PageRank with a custom patch to Spark that warns when a partition is recomputed.

Thanks to Tim Weninger for reporting this bug.

Author: Ankur Dave <ankurdave@gmail.com>

Closes #972 from ankurdave/SPARK-2025 and squashes the following commits:

13d5b07 [Ankur Dave] Unpersist edges of previous graph in Pregel
asfgit pushed a commit that referenced this pull request Jun 6, 2014
Due to a bug introduced by #497, Pregel does not unpersist replicated vertices from previous iterations. As a result, they stay cached until memory is full, wasting GC time.

This PR corrects the problem by unpersisting both the edges and the replicated vertices of previous iterations. This is safe because the edges and replicated vertices of the current iteration are cached by the call to `g.cache()` and then materialized by the call to `messages.count()`. Therefore no unmaterialized RDDs depend on `prevG.edges`. I verified that no recomputation occurs by running PageRank with a custom patch to Spark that warns when a partition is recomputed.

Thanks to Tim Weninger for reporting this bug.

Author: Ankur Dave <ankurdave@gmail.com>

Closes #972 from ankurdave/SPARK-2025 and squashes the following commits:

13d5b07 [Ankur Dave] Unpersist edges of previous graph in Pregel

(cherry picked from commit 9bad0b7)
Signed-off-by: Reynold Xin <rxin@apache.org>
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
This PR makes the following changes, primarily in e4fbd32:

1. *Unify RDDs to avoid zipPartitions.* A graph used to be four RDDs: vertices, edges, routing table, and triplet view. This commit merges them down to two: vertices (with routing table), and edges (with replicated vertices).

2. *Avoid duplicate shuffle in graph building.* We used to do two shuffles when building a graph: one to extract routing information from the edges and move it to the vertices, and another to find nonexistent vertices referred to by edges. With this commit, the latter is done as a side effect of the former.

3. *Avoid no-op shuffle when joins are fully eliminated.* This is a side effect of unifying the edges and the triplet view.

4. *Join elimination for mapTriplets.*

5. *Ship only the needed vertex attributes when upgrading the triplet view.* If the triplet view already contains source attributes, and we now need both attributes, only ship destination attributes rather than re-shipping both. This is done in `ReplicatedVertexView#upgrade`.

Author: Ankur Dave <ankurdave@gmail.com>

Closes apache#497 from ankurdave/unify-rdds and squashes the following commits:

332ab43 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into unify-rdds
4933e2e [Ankur Dave] Exclude RoutingTable from binary compatibility check
5ba8789 [Ankur Dave] Add GraphX upgrade guide from Spark 0.9.1
13ac845 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into unify-rdds
a04765c [Ankur Dave] Remove unnecessary toOps call
57202e8 [Ankur Dave] Replace case with pair parameter
75af062 [Ankur Dave] Add explicit return types
04d3ae5 [Ankur Dave] Convert implicit parameter to context bound
c88b269 [Ankur Dave] Revert upgradeIterator to if-in-a-loop
0d3584c [Ankur Dave] EdgePartition.size should be val
2a928b2 [Ankur Dave] Set locality wait
10b3596 [Ankur Dave] Clean up public API
ae36110 [Ankur Dave] Fix style errors
e4fbd32 [Ankur Dave] Unify GraphImpl RDDs + other graph load optimizations
d6d60e2 [Ankur Dave] In GraphLoader, coalesce to minEdgePartitions
62c7b78 [Ankur Dave] In Analytics, take PageRank numIter
d64e8d4 [Ankur Dave] Log current Pregel iteration
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
Due to a bug introduced by apache#497, Pregel does not unpersist replicated vertices from previous iterations. As a result, they stay cached until memory is full, wasting GC time.

This PR corrects the problem by unpersisting both the edges and the replicated vertices of previous iterations. This is safe because the edges and replicated vertices of the current iteration are cached by the call to `g.cache()` and then materialized by the call to `messages.count()`. Therefore no unmaterialized RDDs depend on `prevG.edges`. I verified that no recomputation occurs by running PageRank with a custom patch to Spark that warns when a partition is recomputed.

Thanks to Tim Weninger for reporting this bug.

Author: Ankur Dave <ankurdave@gmail.com>

Closes apache#972 from ankurdave/SPARK-2025 and squashes the following commits:

13d5b07 [Ankur Dave] Unpersist edges of previous graph in Pregel
gzm55 pushed a commit to MediaV/spark that referenced this pull request Jul 17, 2014
Updated Spark Streaming Programming Guide

Here is the updated version of the Spark Streaming Programming Guide. This is still a work in progress, but the major changes are in place. So feedback is most welcome.

In general, I have tried to make the guide to easier to understand even if the reader does not know much about Spark. The updated website is hosted here -

http://www.eecs.berkeley.edu/~tdas/spark_docs/streaming-programming-guide.html

The major changes are:
- Overview illustrates the usecases of Spark Streaming - various input sources and various output sources
- An example right after overview to quickly give an idea of what Spark Streaming program looks like
- Made Java API and examples a first class citizen like Scala by using tabs to show both Scala and Java examples (similar to AMPCamp tutorial's code tabs)
- Highlighted the DStream operations updateStateByKey and transform because of their powerful nature
- Updated driver node failure recovery text to highlight automatic recovery in Spark standalone mode
- Added information about linking and using the external input sources like Kafka and Flume
- In general, reorganized the sections to better show the Basic section and the more advanced sections like Tuning and Recovery.

Todos:
- Links to the docs of external Kafka, Flume, etc
- Illustrate window operation with figure as well as example.

Author: Tathagata Das <tathagata.das1565@gmail.com>

== Merge branch commits ==

commit 18ff10556570b39d672beeb0a32075215cfcc944
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Tue Jan 28 21:49:30 2014 -0800

    Fixed a lot of broken links.

commit 34a5a6008dac2e107624c7ff0db0824ee5bae45f
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Tue Jan 28 18:02:28 2014 -0800

    Updated github url to use SPARK_GITHUB_URL variable.

commit f338a60ae8069e0a382d2cb170227e5757cc0b7a
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Mon Jan 27 22:42:42 2014 -0800

    More updates based on Patrick and Harvey's comments.

commit 89a81ff25726bf6d26163e0dd938290a79582c0f
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Mon Jan 27 13:08:34 2014 -0800

    Updated docs based on Patricks PR comments.

commit d5b6196b532b5746e019b959a79ea0cc013a8fc3
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Sun Jan 26 20:15:58 2014 -0800

    Added spark.streaming.unpersist config and info on StreamingListener interface.

commit e3dcb46ab83d7071f611d9b5008ba6bc16c9f951
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Sun Jan 26 18:41:12 2014 -0800

    Fixed docs on StreamingContext.getOrCreate.

commit 6c29524639463f11eec721e4d17a9d7159f2944b
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Thu Jan 23 18:49:39 2014 -0800

    Added example and figure for window operations, and links to Kafka and Flume API docs.

commit f06b964a51bb3b21cde2ff8bdea7d9785f6ce3a9
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Wed Jan 22 22:49:12 2014 -0800

    Fixed missing endhighlight tag in the MLlib guide.

commit 036a7d46187ea3f2a0fb8349ef78f10d6c0b43a9
Merge: eab351d a1cd185
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Wed Jan 22 22:17:42 2014 -0800

    Merge remote-tracking branch 'apache/master' into docs-update

commit eab351d05c0baef1d4b549e1581310087158d78d
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Wed Jan 22 22:17:15 2014 -0800

    Update Spark Streaming Programming Guide.
(cherry picked from commit 7930209)

Conflicts:

	docs/mllib-guide.md
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
Due to a bug introduced by apache#497, Pregel does not unpersist replicated vertices from previous iterations. As a result, they stay cached until memory is full, wasting GC time.

This PR corrects the problem by unpersisting both the edges and the replicated vertices of previous iterations. This is safe because the edges and replicated vertices of the current iteration are cached by the call to `g.cache()` and then materialized by the call to `messages.count()`. Therefore no unmaterialized RDDs depend on `prevG.edges`. I verified that no recomputation occurs by running PageRank with a custom patch to Spark that warns when a partition is recomputed.

Thanks to Tim Weninger for reporting this bug.

Author: Ankur Dave <ankurdave@gmail.com>

Closes apache#972 from ankurdave/SPARK-2025 and squashes the following commits:

13d5b07 [Ankur Dave] Unpersist edges of previous graph in Pregel
andrewor14 pushed a commit to andrewor14/spark that referenced this pull request Jan 8, 2015
Updated Spark Streaming Programming Guide

Here is the updated version of the Spark Streaming Programming Guide. This is still a work in progress, but the major changes are in place. So feedback is most welcome.

In general, I have tried to make the guide to easier to understand even if the reader does not know much about Spark. The updated website is hosted here -

http://www.eecs.berkeley.edu/~tdas/spark_docs/streaming-programming-guide.html

The major changes are:
- Overview illustrates the usecases of Spark Streaming - various input sources and various output sources
- An example right after overview to quickly give an idea of what Spark Streaming program looks like
- Made Java API and examples a first class citizen like Scala by using tabs to show both Scala and Java examples (similar to AMPCamp tutorial's code tabs)
- Highlighted the DStream operations updateStateByKey and transform because of their powerful nature
- Updated driver node failure recovery text to highlight automatic recovery in Spark standalone mode
- Added information about linking and using the external input sources like Kafka and Flume
- In general, reorganized the sections to better show the Basic section and the more advanced sections like Tuning and Recovery.

Todos:
- Links to the docs of external Kafka, Flume, etc
- Illustrate window operation with figure as well as example.

Author: Tathagata Das <tathagata.das1565@gmail.com>

== Merge branch commits ==

commit 18ff10556570b39d672beeb0a32075215cfcc944
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Tue Jan 28 21:49:30 2014 -0800

    Fixed a lot of broken links.

commit 34a5a6008dac2e107624c7ff0db0824ee5bae45f
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Tue Jan 28 18:02:28 2014 -0800

    Updated github url to use SPARK_GITHUB_URL variable.

commit f338a60ae8069e0a382d2cb170227e5757cc0b7a
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Mon Jan 27 22:42:42 2014 -0800

    More updates based on Patrick and Harvey's comments.

commit 89a81ff25726bf6d26163e0dd938290a79582c0f
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Mon Jan 27 13:08:34 2014 -0800

    Updated docs based on Patricks PR comments.

commit d5b6196b532b5746e019b959a79ea0cc013a8fc3
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Sun Jan 26 20:15:58 2014 -0800

    Added spark.streaming.unpersist config and info on StreamingListener interface.

commit e3dcb46ab83d7071f611d9b5008ba6bc16c9f951
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Sun Jan 26 18:41:12 2014 -0800

    Fixed docs on StreamingContext.getOrCreate.

commit 6c29524639463f11eec721e4d17a9d7159f2944b
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Thu Jan 23 18:49:39 2014 -0800

    Added example and figure for window operations, and links to Kafka and Flume API docs.

commit f06b964a51bb3b21cde2ff8bdea7d9785f6ce3a9
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Wed Jan 22 22:49:12 2014 -0800

    Fixed missing endhighlight tag in the MLlib guide.

commit 036a7d46187ea3f2a0fb8349ef78f10d6c0b43a9
Merge: eab351d a1cd185
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Wed Jan 22 22:17:42 2014 -0800

    Merge remote-tracking branch 'apache/master' into docs-update

commit eab351d05c0baef1d4b549e1581310087158d78d
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Wed Jan 22 22:17:15 2014 -0800

    Update Spark Streaming Programming Guide.
(cherry picked from commit 7930209)

Conflicts:

	docs/mllib-guide.md
mccheah referenced this pull request in palantir/spark Sep 26, 2017
* Rename package to k8s

* Rename string constants
markhamstra pushed a commit to markhamstra/spark that referenced this pull request Nov 7, 2017
* Rename package to k8s

* Rename string constants
arjunshroff pushed a commit to arjunshroff/spark that referenced this pull request Nov 24, 2020
* [SPARK-26709][SQL] OptimizeMetadataOnlyQuery does not handle empty records correctly

## What changes were proposed in this pull request?

When reading from empty tables, the optimization `OptimizeMetadataOnlyQuery` may return wrong results:
```
sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
sql("SELECT MAX(p1) FROM t")
```
The result is supposed to be `null`. However, with the optimization the result is `5`.

The rule is originally ported from https://issues.apache.org/jira/browse/HIVE-1003 in apache#13494. In Hive, the rule is disabled by default in a later release(https://issues.apache.org/jira/browse/HIVE-15397), due to the same problem.

It is hard to completely avoid the correctness issue. Because data sources like Parquet can be metadata-only. Spark can't tell whether it is empty or not without actually reading it. This PR disable the optimization by default.

## How was this patch tested?

Unit test

Closes apache#23635 from gengliangwang/optimizeMetadata.

Lead-authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Co-authored-by: Xiao Li <gatorsmile@gmail.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
(cherry picked from commit f5b9370)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>

* [SPARK-26080][PYTHON] Skips Python resource limit on Windows in Python worker

## What changes were proposed in this pull request?

`resource` package is a Unix specific package. See https://docs.python.org/2/library/resource.html and https://docs.python.org/3/library/resource.html.

Note that we document Windows support:

> Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS).

This should be backported into branch-2.4 to restore Windows support in Spark 2.4.1.

## How was this patch tested?

Manually mocking the changed logics.

Closes apache#23055 from HyukjinKwon/SPARK-26080.

Lead-authored-by: hyukjinkwon <gurwls223@apache.org>
Co-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 9cda9a8)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>

* [SPARK-26873][SQL] Use a consistent timestamp to build Hadoop Job IDs.

## What changes were proposed in this pull request?

Updates FileFormatWriter to create a consistent Hadoop Job ID for a write.

## How was this patch tested?

Existing tests for regressions.

Closes apache#23777 from rdblue/SPARK-26873-fix-file-format-writer-job-ids.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
(cherry picked from commit 33334e2)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>

* [SPARK-26745][SPARK-24959][SQL][BRANCH-2.4] Revert count optimization in JSON datasource by

## What changes were proposed in this pull request?

This PR reverts JSON count optimization part of apache#21909.

We cannot distinguish the cases below without parsing:

```
[{...}, {...}]
```

```
[]
```

```
{...}
```

```bash
# empty string
```

when we `count()`. One line (input: IN) can be, 0 record, 1 record and multiple records and this is dependent on each input.

See also apache#23665 (comment).

## How was this patch tested?

Manually tested.

Closes apache#23708 from HyukjinKwon/SPARK-26745-backport.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>

* [SPARK-26677][BUILD] Update Parquet to 1.10.1 with notEq pushdown fix.

## What changes were proposed in this pull request?

Update to Parquet Java 1.10.1.

## How was this patch tested?

Added a test from HyukjinKwon that validates the notEq case from SPARK-26677.

Closes apache#23704 from rdblue/SPARK-26677-fix-noteq-parquet-bug.

Lead-authored-by: Ryan Blue <blue@apache.org>
Co-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Co-authored-by: Ryan Blue <rdblue@users.noreply.github.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit f72d217)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>

* [SPARK-26677][FOLLOWUP][BRANCH-2.4] Update Parquet manifest with Hadoop-2.6

## What changes were proposed in this pull request?

During merging Parquet upgrade PR, `hadoop-2.6` profile dependency manifest is missed.

## How was this patch tested?

Manual.

```
./dev/test-dependencies.sh
```

Also, this will recover `branch-2.4` with `hadoop-2.6` build.
- https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.4-test-sbt-hadoop-2.6/281/

Closes apache#23738 from dongjoon-hyun/SPARK-26677-2.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>

* [SPARK-26708][SQL][BRANCH-2.4] Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan

## What changes were proposed in this pull request?

When performing non-cascading cache invalidation, `recache` is called on the other cache entries which are dependent on the cache being invalidated. It leads to the the physical plans of those cache entries being re-compiled. For those cache entries, if the cache RDD has already been persisted, chances are there will be inconsistency between the data and the new plan. It can cause a correctness issue if the new plan's `outputPartitioning`  or `outputOrdering` is different from the that of the actual data, and meanwhile the cache is used by another query that asks for specific `outputPartitioning` or `outputOrdering` which happens to match the new plan but not the actual data.

The fix is to keep the cache entry as it is if the data has been loaded, otherwise re-build the cache entry, with a new plan and an empty cache buffer.

## How was this patch tested?

Added UT.

Closes apache#23678 from maryannxue/spark-26708-2.4.

Authored-by: maryannxue <maryannxue@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>

* [SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka (2.4)

## What changes were proposed in this pull request?

Backport apache#23324 to branch-2.4.

## How was this patch tested?

Jenkins

Closes apache#23365 from zsxwing/SPARK-26267-2.4.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>

* [SPARK-26706][SQL] Fix `illegalNumericPrecedence` for ByteType

This PR contains a minor change in `Cast$mayTruncate` that fixes its logic for bytes.

Right now, `mayTruncate(ByteType, LongType)` returns `false` while `mayTruncate(ShortType, LongType)` returns `true`. Consequently, `spark.range(1, 3).as[Byte]` and `spark.range(1, 3).as[Short]` behave differently.

Potentially, this bug can silently corrupt someone's data.
```scala
// executes silently even though Long is converted into Byte
spark.range(Long.MaxValue - 10, Long.MaxValue).as[Byte]
  .map(b => b - 1)
  .show()
+-----+
|value|
+-----+
|  -12|
|  -11|
|  -10|
|   -9|
|   -8|
|   -7|
|   -6|
|   -5|
|   -4|
|   -3|
+-----+
// throws an AnalysisException: Cannot up cast `id` from bigint to smallint as it may truncate
spark.range(Long.MaxValue - 10, Long.MaxValue).as[Short]
  .map(s => s - 1)
  .show()
```

This PR comes with a set of unit tests.

Closes apache#23632 from aokolnychyi/cast-fix.

Authored-by: Anton Okolnychyi <aokolnychyi@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>

* [SPARK-26078][SQL][BACKPORT-2.4] Dedup self-join attributes on IN subqueries

## What changes were proposed in this pull request?

When there is a self-join as result of a IN subquery, the join condition may be invalid, resulting in trivially true predicates and return wrong results.

The PR deduplicates the subquery output in order to avoid the issue.

## How was this patch tested?

added UT

Closes apache#23449 from mgaido91/SPARK-26078_2.4.

Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>

* [SPARK-26233][SQL][BACKPORT-2.4] CheckOverflow when encoding a decimal value

## What changes were proposed in this pull request?

When we encode a Decimal from external source we don't check for overflow. That method is useful not only in order to enforce that we can represent the correct value in the specified range, but it also changes the underlying data to the right precision/scale. Since in our code generation we assume that a decimal has exactly the same precision and scale of its data type, missing to enforce it can lead to corrupted output/results when there are subsequent transformations.

## How was this patch tested?

added UT

Closes apache#23232 from mgaido91/SPARK-26233_2.4.

Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>

* [SPARK-27097][CHERRY-PICK 2.4] Avoid embedding platform-dependent offsets literally in whole-stage generated code

## What changes were proposed in this pull request?

Spark SQL performs whole-stage code generation to speed up query execution. There are two steps to it:
- Java source code is generated from the physical query plan on the driver. A single version of the source code is generated from a query plan, and sent to all executors.
  - It's compiled to bytecode on the driver to catch compilation errors before sending to executors, but currently only the generated source code gets sent to the executors. The bytecode compilation is for fail-fast only.
- Executors receive the generated source code and compile to bytecode, then the query runs like a hand-written Java program.

In this model, there's an implicit assumption about the driver and executors being run on similar platforms. Some code paths accidentally embedded platform-dependent object layout information into the generated code, such as:
```java
Platform.putLong(buffer, /* offset */ 24, /* value */ 1);
```
This code expects a field to be at offset +24 of the `buffer` object, and sets a value to that field.
But whole-stage code generation generally uses platform-dependent information from the driver. If the object layout is significantly different on the driver and executors, the generated code can be reading/writing to wrong offsets on the executors, causing all kinds of data corruption.

One code pattern that leads to such problem is the use of `Platform.XXX` constants in generated code, e.g. `Platform.BYTE_ARRAY_OFFSET`.

Bad:
```scala
val baseOffset = Platform.BYTE_ARRAY_OFFSET
// codegen template:
s"Platform.putLong($buffer, $baseOffset, $value);"
```
This will embed the value of `Platform.BYTE_ARRAY_OFFSET` on the driver into the generated code.

Good:
```scala
val baseOffset = "Platform.BYTE_ARRAY_OFFSET"
// codegen template:
s"Platform.putLong($buffer, $baseOffset, $value);"
```
This will generate the offset symbolically -- `Platform.putLong(buffer, Platform.BYTE_ARRAY_OFFSET, value)`, which will be able to pick up the correct value on the executors.

Caveat: these offset constants are declared as runtime-initialized `static final` in Java, so they're not compile-time constants from the Java language's perspective. It does lead to a slightly increased size of the generated code, but this is necessary for correctness.

NOTE: there can be other patterns that generate platform-dependent code on the driver which is invalid on the executors. e.g. if the endianness is different between the driver and the executors, and if some generated code makes strong assumption about endianness, it would also be problematic.

## How was this patch tested?

Added a new test suite `WholeStageCodegenSparkSubmitSuite`. This test suite needs to set the driver's extraJavaOptions to force the driver and executor use different Java object layouts, so it's run as an actual SparkSubmit job.

Authored-by: Kris Mok <kris.mokdatabricks.com>

Closes apache#24032 from gatorsmile/testFailure.

Lead-authored-by: Kris Mok <kris.mok@databricks.com>
Co-authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>

* [SPARK-26188][SQL] FileIndex: don't infer data types of partition columns if user specifies schema

## What changes were proposed in this pull request?

This PR is to fix a regression introduced in: https://github.com/apache/spark/pull/21004/files#r236998030

If user specifies schema, Spark don't need to infer data type for of partition columns, otherwise the data type might not match with the one user provided.
E.g. for partition directory `p=4d`, after data type inference  the column value will be `4.0`.
See https://issues.apache.org/jira/browse/SPARK-26188 for more details.

Note that user specified schema **might not cover all the data columns**:
```
val schema = new StructType()
  .add("id", StringType)
  .add("ex", ArrayType(StringType))
val df = spark.read
  .schema(schema)
  .format("parquet")
  .load(src.toString)

assert(df.schema.toList === List(
  StructField("ex", ArrayType(StringType)),
  StructField("part", IntegerType), // inferred partitionColumn dataType
  StructField("id", StringType))) // used user provided partitionColumn dataType
```
For the missing columns in user specified schema, Spark still need to infer their data types if `partitionColumnTypeInferenceEnabled` is enabled.

To implement the partially inference, refactor `PartitioningUtils.parsePartitions`  and pass the user specified schema as parameter to cast partition values.

## How was this patch tested?

Add unit test.

Closes apache#23165 from gengliangwang/fixFileIndex.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 9cfc3ee)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* [SPARK-25921][PYSPARK] Fix barrier task run without BarrierTaskContext while python worker reuse

## What changes were proposed in this pull request?

Running a barrier job after a normal spark job causes the barrier job to run without a BarrierTaskContext. This is because while python worker reuse, BarrierTaskContext._getOrCreate() will still return a TaskContext after firstly submit a normal spark job, we'll get a `AttributeError: 'TaskContext' object has no attribute 'barrier'`. Fix this by adding check logic in BarrierTaskContext._getOrCreate() and make sure it will return BarrierTaskContext in this scenario.

## How was this patch tested?

Add new UT in pyspark-core.

Closes apache#22962 from xuanyuanking/SPARK-25921.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit c00e72f)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants