Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-11080] [SQL] Incorporate per-JVM id into ExprId to prevent unsafe cross-JVM comparisions #9093

Closed
wants to merge 2 commits into from

Conversation

JoshRosen
Copy link
Contributor

In the current implementation of named expressions' ExprIds, we rely on a per-JVM AtomicLong to ensure that expression ids are unique within a JVM. However, these expression ids will not be globally unique. This opens the potential for id collisions if new expression ids happen to be created inside of tasks rather than on the driver.

There are currently a few cases where tasks allocate expression ids, which happen to be safe because those expressions are never compared to expressions created on the driver. In order to guard against the introduction of invalid comparisons between driver-created and executor-created expression ids, this patch extends ExprId to incorporate a UUID to identify the JVM that created the id, which prevents collisions.

@JoshRosen JoshRosen changed the title [SPARK-11080] Throw exception when NamedExpression.newExprId is called inside tasks [SPARK-11080] [SQL] Throw exception when NamedExpression.newExprId is called inside tasks Oct 13, 2015
@SparkQA
Copy link

SparkQA commented Oct 13, 2015

Test build #43632 has finished for PR 9093 at commit 48e3d1c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Digging into the test failures here, it looks like a bunch of them are caused by the BindReferences call inside of AggregationIterator:

[info]   Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree: hyperloglogplusplus(a#4,0.04)
[info]      at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
[info]      at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:315)
[info]      at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:280)
[info]      at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:232)
[info]      at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:217)
[info]      at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:85)
[info]      at org.apache.spark.sql.execution.aggregate.AggregationIterator.<init>(AggregationIterator.scala:93)
[info]      at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.<init>(SortBasedAggregationIterator.scala:29)
[info]      at org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1$$anonfun$2.apply(SortBasedAggregate.scala:86)
[info]      at org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1$$anonfun$2.apply(SortBasedAggregate.scala:72)
[info]      at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:700)
[info]      at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:700)
[info]      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
[info]      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
[info]      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
[info]      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
[info]      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
[info]      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
[info]      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
[info]      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
[info]      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
[info]      at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
[info]      at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
[info]      at org.apache.spark.scheduler.Task.run(Task.scala:88)
[info]      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
[info]      ... 3 more
[info]   Caused by: java.lang.reflect.InvocationTargetException
[info]      at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
[info]      at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
[info]      at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
[info]      at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
[info]      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1$$anonfun$apply$10.apply(TreeNode.scala:326)
[info]      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1$$anonfun$apply$10.apply(TreeNode.scala:325)
[info]      at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
[info]      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1.apply(TreeNode.scala:323)
[info]      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1.apply(TreeNode.scala:315)
[info]      at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
[info]      ... 27 more
[info]   Caused by: java.lang.IllegalStateException: Expression ids should not be allocated inside of tasks
[info]      at org.apache.spark.sql.catalyst.expressions.NamedExpression$.newExprId(namedExpressions.scala:30)
[info]      at org.apache.spark.sql.catalyst.expressions.AttributeReference$.apply$default$5(namedExpressions.scala:184)
[info]      at org.apache.spark.sql.catalyst.expressions.aggregate.HyperLogLogPlusPlus$$anonfun$2.apply(functions.scala:546)
[info]      at org.apache.spark.sql.catalyst.expressions.aggregate.HyperLogLogPlusPlus$$anonfun$2.apply(functions.scala:545)
[info]      at scala.collection.generic.GenTraversableFactory.tabulate(GenTraversableFactory.scala:149)
[info]      at org.apache.spark.sql.catalyst.expressions.aggregate.HyperLogLogPlusPlus.<init>(functions.scala:545)
[info]      ... 37 more (QueryTest.scala:78)

Here, it looks like the copying of HyperLogLogPlus is causing problems because that ends up changing the expressionIds of its aggBufferAttributes.

@cloud-fan
Copy link
Contributor

Now what this PR did doesn't conform to the title, can you update title and description?

@JoshRosen
Copy link
Contributor Author

@cloud-fan, yep, planning to update shortly.

@SparkQA
Copy link

SparkQA commented Oct 13, 2015

Test build #43662 has finished for PR 9093 at commit 955a1a8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ChildProcAppHandle implements SparkAppHandle
    • abstract class LauncherConnection implements Closeable, Runnable
    • final class LauncherProtocol
    • static class Message implements Serializable
    • static class Hello extends Message
    • static class SetAppId extends Message
    • static class SetState extends Message
    • static class Stop extends Message
    • class LauncherServer implements Closeable
    • class NamedThreadFactory implements ThreadFactory
    • class OutputRedirector
    • case class ExprId(id: Long, jvmId: UUID)

@JoshRosen
Copy link
Contributor Author

Alright, updating now....

@JoshRosen JoshRosen changed the title [SPARK-11080] [SQL] Throw exception when NamedExpression.newExprId is called inside tasks [SPARK-11080] [SQL] Incorporate per-JVM id into ExprId to prevent unsafe cross-JVM comparisions Oct 13, 2015
@JoshRosen
Copy link
Contributor Author

Updated; PTAL @marmbrus.

@marmbrus
Copy link
Contributor

LGTM pending tests

@JoshRosen
Copy link
Contributor Author

It already passed tests as of the latest commit.

@marmbrus
Copy link
Contributor

Merging to master.

@asfgit asfgit closed this in ef72673 Oct 13, 2015
@davies
Copy link
Contributor

davies commented Dec 11, 2015

Cherry-picked into branch-1.5, to fix https://issues.apache.org/jira/browse/SPARK-11885

asfgit pushed a commit that referenced this pull request Dec 11, 2015
…afe cross-JVM comparisions

In the current implementation of named expressions' `ExprIds`, we rely on a per-JVM AtomicLong to ensure that expression ids are unique within a JVM. However, these expression ids will not be _globally_ unique. This opens the potential for id collisions if new expression ids happen to be created inside of tasks rather than on the driver.

There are currently a few cases where tasks allocate expression ids, which happen to be safe because those expressions are never compared to expressions created on the driver. In order to guard against the introduction of invalid comparisons between driver-created and executor-created expression ids, this patch extends `ExprId` to incorporate a UUID to identify the JVM that created the id, which prevents collisions.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #9093 from JoshRosen/SPARK-11080.
@JoshRosen JoshRosen deleted the SPARK-11080 branch December 15, 2015 17:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants