Skip to content

[SPARK-48898][SQL] Add Variant shredding functions#48779

Closed
cashmand wants to merge 7 commits intoapache:masterfrom
cashmand:SPARK-48898-write-shredding
Closed

[SPARK-48898][SQL] Add Variant shredding functions#48779
cashmand wants to merge 7 commits intoapache:masterfrom
cashmand:SPARK-48898-write-shredding

Conversation

@cashmand
Copy link
Contributor

@cashmand cashmand commented Nov 6, 2024

What changes were proposed in this pull request?

This is a first step towards adding Variant shredding support for the Parquet writer. It adds functionality to convert a Variant value to an InternalRow that matches the current shredding spec in apache/parquet-format#461.

Once this merges, the next step will be to set up the Parquet writer to accept a shredding schema, and write these InternalRow values to Parquet instead of the raw Variant binary.

Why are the changes needed?

First step towards adding support for shredding, which can improve Variant performance (and will be important for functionality on the read side once other tools begin writing shredded Variant columns to Parquet).

Does this PR introduce any user-facing change?

No, none of this code is currently called outside of the added tests.

How was this patch tested?

Unit tests.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Nov 6, 2024
VariantBuilder variantBuilder = new VariantBuilder(false);
ArrayList<VariantBuilder.FieldEntry> fieldEntries = new ArrayList<>();
// Keep track of which schema fields we actually found in the Variant value.
Set<String> presentKeys = new HashSet<String>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is more efficient if you use the nullness of shreddedValues[i] to determine whether a field presents. You may need an ordered list of fields in VariantSchema to use this approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I was on the fence about how much of an effort to put into efficiency here.

I modified the interface so that the shredding schema constructor takes an array of name-schema pairs. I like this better than the map in any case, since my code was assuming that the indices in the map were dense and corresponded to the array being returned in the shredded result, and this makes that much clearer. I now construct the name->field mapping in the constructor. Let me know what you think.

I didn't bother with trying to create an ordered list of fields, I just iterate over the array a second time to fill in nulls if not all fields matched. I think it could be a bit more efficient to sort the list and step through the list and the Variant object together, but it adds more complexity and branching to the loop, and I'm not sure that it would be significantly faster than the simple second iteration over the loop to fill in null values. Let me know if you had something different in mind.

@HyukjinKwon HyukjinKwon changed the title [SPARK-48898] Add Variant shredding functions [SPARK-48898][SQL] Add Variant shredding functions Nov 6, 2024
@cashmand cashmand requested a review from chenhao-db November 6, 2024 20:53
@Zouxxyy
Copy link
Contributor

Zouxxyy commented Nov 8, 2024

Exciting to see shredding being pushed forward! So If I understand correctly, the shredding write chain may be like this:
Get the expected shredded schema (DataType) through some ways (sampling or justs user defined?) -> parquet writer accepts the variant + shredded schema -> cast to shredded InternalRow -> write to parquet file (the actual col type is the group type corresponding to the shredded dataType).

From this perspective, consider the integration with the lake format, lake format generally has its own reader or writer. I feel like that the lake format may still receive the raw variant data, and the same shredding logic must be implemented in the format's wrtier. If you have any other ideas on it, I'd love to hear your perspective, thanks!

@cashmand
Copy link
Contributor Author

cashmand commented Nov 8, 2024

So If I understand correctly, the shredding write chain may be like this:
Get the expected shredded schema (DataType) through some ways (sampling or justs user defined?) -> parquet writer accepts the variant + shredded schema -> cast to shredded InternalRow -> write to parquet file (the actual col type is the group type corresponding to the shredded dataType).

Yes, exactly. For initial implementation and testing, I plan to set the schema explicitly. I think sampling makes sense as a better user experience in the long term, but needs some thought about the best way to implement it.

From this perspective, consider the integration with the lake format, lake format generally has its own reader or writer. I feel like that the lake format may still receive the raw variant data, and the same shredding logic must be implemented in the format's wrtier. If you have any other ideas on it, I'd love to hear your perspective, thanks!

What do you mean by "lake format"? Are you referring to formats like Iceberg or Delta? I made an effort in this PR to keep the shredding logic in common/variant, and created an interface (ShreddedResult and ShreddedResultBuilder) that Spark implements to construct the InternalRow that it uses in its Parquet writer. The intent is that other writers could implement the interface to match their data types, and still reuse the same code for the shredding logic. Eventually, we can separate common/variant into its own Java library (and maybe move it to the Parquet project, which is where the Variant spec is moving to), to make it easier for other JVM-based writers to use the Variant implementation outside of Spark.

* the shredding scheme:
* struct<
* metadata: binary,
* value: binary,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the value duplicated? In addition to the typed_value, we also store the raw value for each typed field, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we don't duplicate the value. This value field is used if the original value is of the wrong type (in this example, not an object). The other case where we use value is if the typed_value is an object, and the value is also an object, but contains fields that aren't in the typed values schema. E.g. in this example, if we tried to shred the value {"a": 1, "b": 2, "c": 3}, then we'd put the values in a and b in the struct, and would write the object {"c": 3} to value.

By the way, the struct name object in this example is wrong. We've simplified the spec to always call the field typed_value regardless of the shredding type. I'll update this example.

Copy link
Contributor

@gene-db gene-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cashmand Thanks for this feature! I left several questions/comments.

],
"sqlState" : "22023"
},
"INVALID_VARIANT_SCHEMA" : {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Should we name this INVALID_VARIANT_SHREDDING_SCHEMA? Or, is a shredding schema the only type of Variant schema that can exist?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't really think of a different invalid schema, but I guess it doesn't hurt to rename it to be a bit clearer.

});
}

// Get the dictionary ID for the object field at the `index` slot. Throws malformedVariatn if
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Get the dictionary ID for the object field at the `index` slot. Throws malformedVariatn if
// Get the dictionary ID for the object field at the `index` slot. Throws malformedVariant if

return new Variant(Arrays.copyOfRange(writeBuffer, 0, writePos), metadata);
}

// Return the variant value only, without metadata.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ok to run before result(), and/or after result()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, either is fine, although the expectation is that it's run instead of result(). I'll add a comment.

// building an object during shredding, where there is a fixed pre-existing metadata that
// all shredded values will refer to.
public void shallowAppendVariant(Variant v) {
int size = valueSize(v.value, v.pos);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks basically like the code in https://github.com/apache/spark/pull/48779/files#diff-397d59a94179cafec72d53368f75ffdb825315cb839a87a789db4521650659c0R414

I wonder if we could pull out that logic, and have both locations call the common function?

public static final class TimestampNTZType extends ScalarType {
}

public final int typedIdx;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment on what these indexes represent? What do they into? What are valid values for these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I'll add a comment. I don't really use them in the common/variant code. For spark, they represent the position of the corresponding value within the InternalRow.

}

if (isTopLevel) {
StructType(StructField("metadata", BinaryType, nullable = true) +: fields)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the top-level metadata nullable? I thought the metadata always had some bytes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I'll change it.

}
}

if (topLevel != (topLevelMetadataIdx >= 0)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are translating the StructType to the VariantSchema, so we don't actually need topLevel. Are we using topLevel only for validation of variantShreddingSchema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's just for validation. In the long term, I wasn't sure if we'd necessarily use variantShreddingSchema, or if data sources would directly provide the full shredding schema, so I think it's good to validate here.

// Result is stored as an InternalRow.
val row = new GenericInternalRow(schema.numFields)

override def addArray(schema: VariantSchema,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this schema be different from the class SparkShreddedResult(schema: VariantSchema) schema? Must this function schema be one of the sub-schemas of the class schema? I'm not sure how the class schema and the parameter schemas are different/same or how they are used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that's a good point. I don't think it's necessary to pass the schema into the methods. I'll try to remove it.

/**
* Converts an input variant into shredded components. Returns the shredded result, as well
* as the original Variant with shredded fields removed.
* `dataType` must be a valid shredding schema, as described in common/variant/shredding.md.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we point to the parquet spec doc?

VariantSchema.ObjectField[] objectSchema = schema.objectSchema;
ShreddedResult[] shreddedValues = new ShreddedResult[objectSchema.length];

// Create a variantBuilder for any mismatched fields.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly does "mismatched fields" mean? Could it mean type mismatches?

Or, is this just for any variant fields which are not found in the shredding schema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latter. I'll reword it to clarify.

Copy link
Contributor Author

@cashmand cashmand left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, @gene-db! I updated the PR based on your comments.

],
"sqlState" : "22023"
},
"INVALID_VARIANT_SCHEMA" : {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't really think of a different invalid schema, but I guess it doesn't hurt to rename it to be a bit clearer.

return new Variant(Arrays.copyOfRange(writeBuffer, 0, writePos), metadata);
}

// Return the variant value only, without metadata.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, either is fine, although the expectation is that it's run instead of result(). I'll add a comment.

public static final class TimestampNTZType extends ScalarType {
}

public final int typedIdx;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I'll add a comment. I don't really use them in the common/variant code. For spark, they represent the position of the corresponding value within the InternalRow.


public final int typedIdx;
public final int variantIdx;
public final int topLevelMetadataIdx;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll add a comment.

public final int typedIdx;
public final int variantIdx;
public final int topLevelMetadataIdx;
public final int numFields;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will always be the first thing - i.e. 1, 2 or 3 depending on how many of value, typed_value and metadata are in the schema.

}

/**
* Given an expected schema of a Variant value, returns a suitable schema for shredding, by
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VariantSchema is a concept in the common/variant code, and represents a shredding schema (possibly one that is nested as an array element or object field in a larger shredding schema).

StructType is a Spark type. It happens to be used as the Spark implementation to represent both VariantSchema and a shredded object in such a schema.

val arrayShreddingSchema =
ArrayType(variantShreddingSchema(elementType, false), containsNull)
Seq(
StructField("value", BinaryType, nullable = true),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I'll put it in VariantaSchema, since it's part of the spec.

}

if (isTopLevel) {
StructType(StructField("metadata", BinaryType, nullable = true) +: fields)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I'll change it.

}
}

if (topLevel != (topLevelMetadataIdx >= 0)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's just for validation. In the long term, I wasn't sure if we'd necessarily use variantShreddingSchema, or if data sources would directly provide the full shredding schema, so I think it's good to validate here.

// Result is stored as an InternalRow.
val row = new GenericInternalRow(schema.numFields)

override def addArray(schema: VariantSchema,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that's a good point. I don't think it's necessary to pass the schema into the methods. I'll try to remove it.

@cashmand cashmand requested a review from gene-db November 12, 2024 19:26
Copy link
Contributor

@gene-db gene-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cashmand Thanks for these functions!

LGTM

public static final class TimestampNTZType extends ScalarType {
}

// The index of the typed_value, value, and metadata fields in the schema, respectively. I given
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// The index of the typed_value, value, and metadata fields in the schema, respectively. I given
// The index of the typed_value, value, and metadata fields in the schema, respectively. If given

@cloud-fan
Copy link
Contributor

The docker test failure is unrelated, thanks, merging to master!

@cloud-fan cloud-fan closed this in 3c3d1a6 Nov 13, 2024
@adriangb
Copy link

adriangb commented Apr 6, 2025

Hi folks, sorry to bother you, could some of you who are familiar with these bits of Spark please post example files to https://github.com/apache/parquet-testing so that non-Spark devs can start working on the format in other implementations? Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants