Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43321][Connect] Dataset#Joinwith #40997

Closed
wants to merge 10 commits into from

Conversation

zhenlineo
Copy link
Contributor

@zhenlineo zhenlineo commented Apr 28, 2023

What changes were proposed in this pull request?

Impl missing method JoinWith with Join relation operation

The JoinWith adds left and right struct type info in the Join relation proto.

Why are the changes needed?

Missing Dataset API

Does this PR introduce any user-facing change?

Yes. Added the missing Dataset#JoinWith method

How was this patch tested?

E2E tests.

.setRight(other.plan.getRoot)
.setJoinType(joinTypeValue)
.setJoinCondition(condition.expr)
.setLeftSchema(DataTypeProtoConverter.toConnectProtoType(this.schema))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to avoid the RPCs here? The input schema(s) can actually change between the schema call and the actual materialization of the plan. IMO it would be better to state in the proto we want the join to produce a different result shape, instead of explicitly sending the schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to pass one info to the server: whether the top level of the encoder is a struct or not. I feel it might look more strange to pass a boolean here isTopLevelStruct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but passing the encoders only tells the server that a different output schema is expected, i.e. struct(left.*), struct(right.*) instead of left.*, right.*. There is a precedent in SQL called NATURAL JOIN. I would probably add a field to the Join message called leftAndRightAsStruct or something in that vain.

Copy link
Contributor Author

@zhenlineo zhenlineo May 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% sure I understand. Looked into the NATURAL JOIN, we computed the joinResultStruct directly. I do not see an easy way to do the same on the client, so I changed the code to pass struct(left.*, right.*) now.

throw new IllegalArgumentException(s"Unsupported join type `joinType`: $e")
}

val tupleEncoder = ProductEncoder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This opens up an interesting can of Worms. The problem is basically that we don't know the encoder for a Dataframe. If you joinWith a DataFrame (Dataset[Row]) with a Dataset[E] you will end up with a Dataset[(Row, E)], the problem is that we don't know the schema for Row and we infer this from the schema in SparkResult, this does not work in the joinWith scenario...

Copy link
Contributor Author

@zhenlineo zhenlineo May 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The worm can is sealed. I added some extra logic in SparkResult to convert each UnboundRowEncoder to a new encoder inferred from the row schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The can of worms is only closed if the UnboundRowEncoders are only nested in Products/Rows :)... Fine for now though.

@zhenlineo zhenlineo force-pushed the joinwith branch 5 times, most recently from 98f47ab to 0e2079f Compare May 2, 2023 18:46
@@ -252,6 +252,18 @@ class Dataset[T] private[sql] (
}
}

private def containsUnboudRowEncoder(enc: AgnosticEncoder[_]): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we address other nested structures in a follow-up? Granted this covers 90% of the cases. If we end up having a lot of communality with the code in SparkResult, then I'd suggest moving this into a single place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added SPARK-44228 to address the nested row encoders all together.

}

def isRowStruct(enc: AgnosticEncoder[_]): Boolean =
enc.dataType.isInstanceOf[StructType] && !enc.isInstanceOf[OptionEncoder[_]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just have a parent encoder for all struct encoders, and check for that? I feel that this is overly complex.

true
} else {
enc match {
case ProductEncoder(clsTag, fields) if ProductEncoder.isTuple(clsTag) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory any 'struct' encoder can contain an UnboundRowEncoder. This only deals with the stuff we can currently create in scala client. That is fine, but then we should document this limitation somewhere.

case ProductEncoder(clsTag, fields) if ProductEncoder.isTuple(clsTag) =>
// Recursively continue updating the tuple product encoder
val schema = dataType.asInstanceOf[StructType]
assert(fields.length == schema.fields.length)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to hold. I think fields.length <= schema.fields.length needs to hold.


// Optional. set to true if the left is a struct that can be flattened to a row. Only used by
// JoinWith where the left is a dataset.
optional bool is_left_row_struct = 6;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just make this one flag?

@@ -719,3 +720,99 @@ case class CoGroup(
override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan): CoGroup = copy(left = newLeft, right = newRight)
}

object JoinWith {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you file an follow-up so we can move this into the analyzer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added SPARK-44225

@zhenlineo zhenlineo force-pushed the joinwith branch 5 times, most recently from 34c5a67 to fa23dfc Compare June 29, 2023 21:37
fields: Seq[EncoderField])
extends AgnosticEncoder[K] {
// Contains a sequence of fields. The fields can be flattened to columns in a row.
trait FieldsEncoder[K] extends AgnosticEncoder[K] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StructEncoder?

// (Optional) Only used by joinWith. Set the left and right join data types.
optional JoinDataType join_data_type = 6;

message JoinDataType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably the right thing to do for the wrong reason. The problem that you seem to be solving is dealing with structs, you need to know when you should wrap the input in a struct. However this could easily be solved by looking at the left & right input schemas (if more than 1 column make this a struct).

The problem that you are actually solving is that when you have applied a single value encoder to a dataset with multiple columns. The single encoder will bind to the first column. JoinWith should not project the remaining columns. This is not something we can infer from the serverside schema.


message JoinDataType {
// If the left data type is a struct that can be flatten to a row.
bool is_left_flattenable_to_row = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd just name this is_left_struct or is_left_single_value. Flattening is a detail we don't really care about.

The same applies to the right side.

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@hvanhovell
Copy link
Contributor

Merging. Please address the naming issues in a follow-up.

@hvanhovell hvanhovell closed this in 9c7978d Jul 6, 2023
yaooqinn pushed a commit that referenced this pull request Jul 13, 2023
…Client joinWith

### What changes were proposed in this pull request?
Directly use `struct` in the names to directly call out the underlying encoder is composed using a struct encoder.

### Why are the changes needed?
Addressing PR review feedbacks of #40997

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests

Closes #41971 from zhenlineo/followup-joinwith.

Authored-by: Zhen Li <zhenlineo@users.noreply.github.com>
Signed-off-by: Kent Yao <yao@apache.org>
ragnarok56 pushed a commit to ragnarok56/spark that referenced this pull request Mar 2, 2024
### What changes were proposed in this pull request?
Impl missing method JoinWith with Join relation operation

The JoinWith adds `left` and `right` struct type info in the Join relation proto.
### Why are the changes needed?
Missing Dataset API

### Does this PR introduce _any_ user-facing change?
Yes. Added the missing Dataset#JoinWith method

### How was this patch tested?
E2E tests.

Closes apache#40997 from zhenlineo/joinwith.

Authored-by: Zhen Li <zhenlineo@users.noreply.github.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
ragnarok56 pushed a commit to ragnarok56/spark that referenced this pull request Mar 2, 2024
…Client joinWith

### What changes were proposed in this pull request?
Directly use `struct` in the names to directly call out the underlying encoder is composed using a struct encoder.

### Why are the changes needed?
Addressing PR review feedbacks of apache#40997

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests

Closes apache#41971 from zhenlineo/followup-joinwith.

Authored-by: Zhen Li <zhenlineo@users.noreply.github.com>
Signed-off-by: Kent Yao <yao@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants