Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12813][SQL] Eliminate serialization for back to back operations #10747

Closed
wants to merge 5 commits into from

Conversation

Projects
None yet
4 participants
@marmbrus
Copy link
Contributor

commented Jan 13, 2016

The goal of this PR is to eliminate unnecessary translations when there are back-to-back MapPartitions operations. In order to achieve this I also made the following simplifications:

  • Operators no longer have hold encoders, instead they have only the expressions that they need. The benefits here are twofold: the expressions are visible to transformations so go through the normal resolution/binding process. now that they are visible we can change them on a case by case basis.
  • Operators no longer have type parameters. Since the engine is responsible for its own type checking, having the types visible to the complier was an unnecessary complication. We still leverage the scala compiler in the companion factory when constructing a new operator, but after this the types are discarded.

Deferred to a follow up PR:

  • Remove as much of the resolution/binding from Dataset/GroupedDataset as possible. We should still eagerly check resolution and throw an error though in the case of mismatches for an as operation.
  • Eliminate serializations in more cases by adding more cases to EliminateSerialization
@marmbrus

This comment has been minimized.

Copy link
Contributor Author

commented Jan 13, 2016

@SparkQA

This comment has been minimized.

Copy link

commented Jan 14, 2016

Test build #49353 has finished for PR 10747 at commit 4615c96.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait ObjectOperator extends LogicalPlan
    • case class MapPartitions(
    • case class AppendColumns(
    • case class MapGroups(
    • case class CoGroup(
    • trait ObjectOperator extends SparkPlan
    • case class MapPartitions(
    • case class AppendColumns(
    • case class MapGroups(
    • case class CoGroup(
@marmbrus

This comment has been minimized.

Copy link
Contributor Author

commented Jan 14, 2016

test this please

@SparkQA

This comment has been minimized.

Copy link

commented Jan 14, 2016

Test build #49360 has finished for PR 10747 at commit ee7f3c6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
test("back to back MapPartitions") {
val input = LocalRelation('_1.int, '_2.int)
val plan =
MapPartitions(func,

This comment has been minimized.

Copy link
@rxin

rxin Jan 14, 2016

Contributor

should have a test case that tests a plan that cannot be eliminated?

This comment has been minimized.

Copy link
@marmbrus

marmbrus Jan 14, 2016

Author Contributor

Oh yeah, I guess I forgot to push it.

This comment has been minimized.

Copy link
@marmbrus

marmbrus Jan 14, 2016

Author Contributor

There's a test here and and in end-to-end one in DatasetSuite now.

val outputObject = generateToRow(serializer)

iter.map { row =>
val newColumns = outputObject(func(getObject(row)))

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Jan 14, 2016

Contributor

how do we ensure newColumns is UnsafeRow?

This comment has been minimized.

Copy link
@marmbrus

marmbrus Jan 14, 2016

Author Contributor

Yeah, thats a good point. It is only safe because we only use append columns to feed into aggregation, thus we'll only ever need an unsafe row. We could make this more general, but I'm not sure its worth it given that this will likely get rewritten in the near future for codegen anyway.

encoderFor[U],
encoderFor[U].schema.toAttributes,
logicalPlan))
MapPartitions[T, U](func, logicalPlan))

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Jan 14, 2016

Contributor

This is different from the previous one, we only pass the type parameter T to MapPartitions and build a new encoder there which is unresolved, while before this PR we pass a resolvedTEncoder. Do we break the life cycle of encoder in this PR?

This comment has been minimized.

Copy link
@marmbrus

marmbrus Jan 14, 2016

Author Contributor

This is just pushing the lifecycle of the encoder into the analyzer / physical operators where it belongs.

Alias(serializer.head.collect { case b: BoundReference => b }.head, "obj")()

def withObjectOutput: LogicalPlan = if (output.head.dataType.isInstanceOf[ObjectType]) {
this

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Jan 14, 2016

Contributor

when will we go to this branch?

This comment has been minimized.

Copy link
@marmbrus

marmbrus Jan 14, 2016

Author Contributor

When the output is already in the form of an object (instead of a serialized row)

This comment has been minimized.

Copy link
@marmbrus

marmbrus Jan 14, 2016

Author Contributor

I'll add some comments to this trait.

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Jan 14, 2016

Contributor

ah got it, after back-to-back optimization, we may produce object in serializer


/**
* A relation produced by applying `func` to each partition of the `child`, concatenating the
* resulting columns at the end of the input row. tEncoder/uEncoder are used respectively to

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Jan 14, 2016

Contributor

update javadoc

@SparkQA

This comment has been minimized.

Copy link

commented Jan 14, 2016

Test build #49404 has finished for PR 10747 at commit ecde6e5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
input: Expression,
serializer: Seq[NamedExpression],
child: LogicalPlan) extends UnaryNode with ObjectOperator {
override def output: Seq[Attribute] = serializer.map(_.toAttribute)

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Jan 14, 2016

Contributor

can we just use serializer.map(_.toAttribute.newInstance) here? then we don't need to add NamedExpression.newInstance

This comment has been minimized.

Copy link
@marmbrus

marmbrus Jan 14, 2016

Author Contributor

That would return different expressionIds anytime the function was called. Where as we want to fix the expression IDs when the NamedExpression is created.

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Jan 14, 2016

Contributor

ah yes

@SparkQA

This comment has been minimized.

Copy link

commented Jan 15, 2016

Test build #49417 has finished for PR 10747 at commit c34aacf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@@ -31,7 +31,7 @@ import org.apache.spark.sql.types._
case class BoundReference(ordinal: Int, dataType: DataType, nullable: Boolean)
extends LeafExpression with NamedExpression {

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Jan 15, 2016

Contributor

unrelated question: why BoundReference extends NamedExpression?

This comment has been minimized.

Copy link
@marmbrus

marmbrus Jan 15, 2016

Author Contributor

Its kinda of a hack, but sometimes after transforms we end up with BoundReferences in the place of fields that were AttributeReference and so there were class cast exceptions. We might be able to remove this some day or now?

@cloud-fan

This comment has been minimized.

Copy link
Contributor

commented Jan 15, 2016

LGTM

@marmbrus

This comment has been minimized.

Copy link
Contributor Author

commented Jan 15, 2016

Thanks for reviewing! Merging to master.

@asfgit asfgit closed this in cc7af86 Jan 15, 2016

@marmbrus marmbrus deleted the marmbrus:encoderExpressions branch Mar 8, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.