Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45827][SQL] Add Variant data type in Spark. #43707

Closed
wants to merge 13 commits into from

Conversation

chenhao-db
Copy link
Contributor

@chenhao-db chenhao-db commented Nov 7, 2023

What changes were proposed in this pull request?

This PR adds Variant data type in Spark. It doesn't actually introduce any binary encoding, but just has the value and metadata binaries.

This PR includes:

  • The in-memory Variant representation in different types of Spark rows. All rows except UnsafeRow use the VariantVal object to store an Variant value. In the UnsafeRow, the two binaries are stored contiguously.
  • Spark parquet writer and reader support for the Variant type. This is agnostic to the detailed binary encoding but just transparently reads the two binaries.
  • A dummy Spark parse_json implementation so that I can manually test the writer and reader. It currently returns an VariantVal with value being the raw bytes of the input string and empty metadata. This is not a valid Variant value in the final binary encoding.

How was this patch tested?

Manual testing. Some supported usages:

> sql("create table T using parquet as select parse_json('1') as o")
> sql("select * from T").show
+---+
|  o|
+---+
|  1|
+---+
> sql("insert into T select parse_json('[2]') as o")
> sql("select * from T").show
+---+
|  o|
+---+
|[2]|
|  1|
+---+

@github-actions github-actions bot added the SQL label Nov 7, 2023
@github-actions github-actions bot added the DOCS label Nov 7, 2023
@chenhao-db chenhao-db changed the title Add Variant data type in Spark. [SPARK-45827] Add Variant data type in Spark. Nov 7, 2023
protected override def nullSafeEval(input: Any): Any = {
// A dummy implementation: the value is the raw bytes of the input string. This is not the final
// implementation, but only intended for debugging.
new VariantVal(input.asInstanceOf[UTF8String].toString.getBytes, Array())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably implement the checkInputDataTypes method to enforce that the input is actually a UTF8String before we reach this point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, my mistake on this.

@@ -326,6 +327,17 @@ case class PhysicalStructType(fields: Array[StructField]) extends PhysicalDataTy
}
}

class PhysicalVariantType extends PhysicalDataType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a comment for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel there is anything special and worth a comment in this class. All Physical*Type classes in this file don't have a comment.

protected override def nullSafeEval(input: Any): Any = {
// A dummy implementation: the value is the raw bytes of the input string. This is not the final
// implementation, but only intended for debugging.
new VariantVal(input.asInstanceOf[UTF8String].toString.getBytes, Array())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, my mistake on this.

* This function writes the binary content into {@code buffer} starting from {@code cursor}, as
* described in the class comment. The caller should guarantee there is enough space in `buffer`.
*/
public void writeIntoUnsafeRow(byte[] buffer, long cursor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void writeIntoUnsafeRow(byte[] buffer, long cursor) {
public void writeIntoUnsafeRow(Object baseObject, long baseOffset, long cursor) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no byte[] if Spark is using offheap mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure what you mean. This function is called by UnsafeWriter, which always uses byte[] to build an UnsafeRow. There is no benefit of changing this function to use a generic base object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting, UnsafeWriter always use byte[], then it's fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused. Why not add writeIntoUnsafeRow into UnsafeWriter?

Copy link
Contributor

@cloud-fan cloud-fan Nov 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this makes sense to me. The job of writing to unsafe row belongs to UnsafeWriter and the code should be put there as well. What do you think? @chenhao-db

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it will introduce extra stack length.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense. The original reason I put writeIntoUnsafeRow in this class is to avoid code duplication (e.g., readFromUnsafeRow is called by two classes, such a duplication is quite common for other physical value types). But since writeIntoUnsafeRow only has one caller UnsafeWriter, it is okay to just put it in UnsafeWriter.

int metadataSize = totalSize - 4 - valueSize;
byte[] value = new byte[valueSize];
byte[] metadata = new byte[metadataSize];
Platform.copyMemory(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can avoid copy if VariantVal can follow UTF8String and also represent its data as baseObject + baseOffset. Does Java have nicer way to do it now? cc @rednaxelafx

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know how UTF8String works, but I feel it is simpler to have byte[] in the VariantVal object instead of baseObject + baseOffset. I prefer to have this version first and it is not something unchangeable in the future.

"""
Examples:
""",
since = "3.4.0",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4.0.0

@cloud-fan
Copy link
Contributor

Can we check FileFormat#supportDataType and make sure only Parquet supports it?

@cloud-fan
Copy link
Contributor

We should also check all the call sites of DataSource#disallowWritingIntervals and also disallow writing variants to DS v1.

Copy link
Contributor

@beliefer beliefer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any test?

import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.unsafe.types.VariantVal

class VariantSuite extends QueryTest with SharedSparkSession {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@beliefer this is the test.


// At this point, JSON parsing logic is not really implemented. We just construct some number
// inputs that are also valid JSON. This exercises passing VariantVal throughout the system.
val query = spark.sql("select parse_json(repeat('1', id)) as v from range(1, 10)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need a fake function to get variant values. Since we have also updated the encoder code, I think Seq(VariantVal(...)).toDF("col") should work. If not, we need to check the encode code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not really "fake": when the JSON parsing is implemented, this test will still be valid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to keep it, and add some round-trip tests with spark.createDataFrame(spark.sparkContext.parallelize(rows), schema).


/**
* The data type representing semi-structured values with arbitrary hierarchical data structures. At
* this moment, it is intended to store parsed JSON values and almost any other data types in the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't sound like @Stable API at all. Let's replace it to at least @Unstable.

* The data type representing semi-structured values with arbitrary hierarchical data structures. At
* this moment, it is intended to store parsed JSON values and almost any other data types in the
* system (e.g., we don't plan to let it store a map with a non-string key type). In the future, we
* may also extend it to store other semi-structured data representation like XML.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should rewrite this statement here. The API documentation shouldn't really mention something like "at this moment" or "in the future". Should just say which version supports what. Therefore, should mention the version @since too.

usage = "_FUNC_(jsonStr) - Parse a JSON string as an Variant value. Throw an exception when the string is not valid JSON value.",
examples =
"""
Examples:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should add examples.


override def dataType: DataType = VariantType

override def nullable: Boolean = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, it implements nullSafeEval so I guess it's nullable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it at lease should be child.nullable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it at lease should be child.nullable

This is correct, and I don't need to override nullable in this case.

nullSafeEval is ignoring null inputs, and the expression itself is not nullable for non-null inputs.

@HyukjinKwon HyukjinKwon changed the title [SPARK-45827] Add Variant data type in Spark. [SPARK-45827][SQL] Add Variant data type in Spark. Nov 10, 2023
assert(rowWriter.getRow.getVariant(0) === null)
val variant = new VariantVal(Array[Byte](1, 2, 3), Array[Byte](-1, -2, -3, -4))
rowWriter.write(1, variant)
assert(rowWriter.getRow.getVariant(1).debugString() == variant.debugString())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we override equals and hashCode for VariantVal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should have them at this moment: the equivalence of VariantVal is much more complicated than byte-to-byte comparison (i.e., different binary values can represent the same variant). It will be pretty complex and will depend on the detailed encoding. It will be confusing if we have equals and hashCode as byte-to-byte comparisons.

In my mind, when we are testing the result of VariantVal in the future, we should use semantic equivalence rather than byte-to-byte comparison. This test is really a special case because we want to verify we exactly pass the VariantVal throughout the system. We can add something like equalByBytes, but it doesn't make things any simpler, so I choose to compare the debugString.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Nov 10, 2023

Please create an umbrella JIRA, and add this (SPARK-45827) as a sub-task, and add some more sub-tasks if possible (e.g., Python, R support, documentation). Adding a new type needs a huge change to provide a proper support (e.g., see SPARK-27790)

class VariantType private () extends AtomicType {
// The default size is used in query planning to drive optimization decisions. 2048 is arbitrarily
// picked and we currently don't have any data to support it. This may need revisiting later.
override def defaultSize: Int = 2048
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we get the actual length cheaply?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a default size for the variant type, not for a certain variant value. StringType has the same thing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

@@ -808,6 +809,9 @@ object FunctionRegistry {
expression[LengthOfJsonArray]("json_array_length"),
expression[JsonObjectKeys]("json_object_keys"),

// Variant
expression[ParseJson]("parse_json"),
Copy link
Contributor

@beliefer beliefer Nov 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we implement parse_json with another PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will have an actual implementation for parse_json in the future. At this point, I think it is beneficial to include a "fake" implementation to help testing and experimenting.

@@ -1857,6 +1857,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
messageParameters = Map("dataType" -> field.dataType.catalogString))
}

def cannotSaveVariantIntoExternalStorageError(): Throwable = {
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_1176",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we give an useful name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a legacy from history, this is a new one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I updated the error class.

Copy link
Contributor

@beliefer beliefer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except some comments.


override private[sql] def ordering =
throw QueryExecutionErrors.orderedOperationUnsupportedByDataTypeError(
"PhysicalVariantType")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we supports sort in future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, but probably not soon. I also added a todo.

@@ -1564,6 +1564,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
messageParameters = Map.empty)
}

def cannotSaveVariantIntoExternalStorageError(): Throwable = {
new AnalysisException(
errorClass = "CANNOT_SAVE_VARIANT",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.map(_.get(0).asInstanceOf[VariantVal].toString)
.sorted
.toSeq
val expected = (1 until 10).map(id => "1" * id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean is put val expected = (1 until 10).map(id => "1" * id) out verifyResult.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it matters much.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current code creates expected twice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm aware of that, but the cost is really ignorable, and I actually like the current code more because the creation is near to the use.

@HyukjinKwon
Copy link
Member

Merged to master.

cloud-fan pushed a commit that referenced this pull request Nov 16, 2023
## What changes were proposed in this pull request?

This is a follow-up of #43707. The previous PR missed a piece in the variant parquet reader: we are treating the variant type as `struct<value binary, metadata binary>`, so it also needs a similar `assembleStruct` process in the Parquet reader to correctly set the nullness of variant values from def/rep levels.

## How was this patch tested?

Extend the existing unit test. It would fail without the change.

Closes #43825 from chenhao-db/fix_variant_parquet_reader.

Authored-by: Chenhao Li <chenhao.li@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Nov 21, 2023
### What changes were proposed in this pull request?

Add a small fix for #43707. Since Variant is represented in columnar form as a struct, it must use `StructNullableTypeConverter` so that nulls are set properly in child column vectors.

### Why are the changes needed?

Fixes a potential when setting nulls in Variant columns.

### Does this PR introduce _any_ user-facing change?

No, Variant is not released yet.

### How was this patch tested?

Updated existing unit test to test Variant. It fails without the fix.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43911 from cashmand/SPARK-45827-fixnulls.

Authored-by: cashmand <david.cashman@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
ueshin pushed a commit that referenced this pull request Feb 26, 2024
### What changes were proposed in this pull request?

The Variant datatype was added in #43707 but the equivalent PySpark type was not added. In this PR we add Variant to PySpark which allows us to create PySpark dataframes containing the Variant type.

### Why are the changes needed?

Without this PR, trying to create a dataframe containing a variant type results in
`AssertionError: Undefined error message parameter for error class: CANNOT_PARSE_DATATYPE. Parameters: {'error': "Undefined error message parameter for error class: CANNOT_PARSE_DATATYPE. Parameters: {'error': 'variant'}"}`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added new PySpark type tests involving Variant.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #45131 from desmondcheongzx/variant-pyspark-type-info.

Authored-by: Desmond Cheong <desmond.cheong@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
TakawaAkirayo pushed a commit to TakawaAkirayo/spark that referenced this pull request Mar 4, 2024
### What changes were proposed in this pull request?

The Variant datatype was added in apache#43707 but the equivalent PySpark type was not added. In this PR we add Variant to PySpark which allows us to create PySpark dataframes containing the Variant type.

### Why are the changes needed?

Without this PR, trying to create a dataframe containing a variant type results in
`AssertionError: Undefined error message parameter for error class: CANNOT_PARSE_DATATYPE. Parameters: {'error': "Undefined error message parameter for error class: CANNOT_PARSE_DATATYPE. Parameters: {'error': 'variant'}"}`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added new PySpark type tests involving Variant.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#45131 from desmondcheongzx/variant-pyspark-type-info.

Authored-by: Desmond Cheong <desmond.cheong@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
ericm-db pushed a commit to ericm-db/spark that referenced this pull request Mar 5, 2024
### What changes were proposed in this pull request?

The Variant datatype was added in apache#43707 but the equivalent PySpark type was not added. In this PR we add Variant to PySpark which allows us to create PySpark dataframes containing the Variant type.

### Why are the changes needed?

Without this PR, trying to create a dataframe containing a variant type results in
`AssertionError: Undefined error message parameter for error class: CANNOT_PARSE_DATATYPE. Parameters: {'error': "Undefined error message parameter for error class: CANNOT_PARSE_DATATYPE. Parameters: {'error': 'variant'}"}`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added new PySpark type tests involving Variant.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#45131 from desmondcheongzx/variant-pyspark-type-info.

Authored-by: Desmond Cheong <desmond.cheong@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants