-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-54410][SQL] Fix read support for the variant logical type annotation #53120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c6fab62
88a7439
1f3e3e8
e308d20
4eadb8c
124340a
7bccd52
653023f
2d0f545
05a88ad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1593,6 +1593,15 @@ object SQLConf { | |
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| val PARQUET_IGNORE_VARIANT_ANNOTATION = | ||
| buildConf("spark.sql.parquet.ignoreVariantAnnotation") | ||
| .internal() | ||
| .doc("When true, ignore the variant logical type annotation and treat the Parquet " + | ||
| "column in the same way as the underlying struct type") | ||
| .version("4.1.0") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is a bug fix, this should be 4.0.2, @harshmotw-db and @cloud-fan .
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if the parquet version we use in Spark 4.0 has the variant logical type. I'll leave it to @harshmotw-db
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Thanks. We can continue our discussion if we are not sure. AFAIK, it means there is no regression at Apache Spark 4.1.0 from Apache Spark 4.0.0.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the record, for the improvement, this should be 4.2.0 according to the Apache Spark community policy, @harshmotw-db and @cloud-fan .
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given Spark 4.1 has upgraded the parquet version which has logical variant type, I think 4.1 should support reading parquet files with native variant type fields?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC, we can say that it's still simply unsupported feature like we did in Apache Spark 4.0.0
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This PR practically is a fix already. This PR added a temporary workaround for reading variant data mainly for testing purposes (see this line). Essentially, the existing code behaves as if
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't you revise the PR title more properly which looks like a fix literally, @harshmotw-db ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, the ParquetRowConverter fix is essential since currently, when
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, in practice it is a fix. I need to head out for an hour and I will change the PR title after that |
||
| .booleanConf | ||
| .createWithDefault(false) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When this should be
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's mainly for debugging purposes if we need to extract the raw variant bytes by specifying the schema as say
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, for that purpose, let's remove this configuration. You can use
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct me if I'm wrong but I don't think logDebug would be helpful here if we want to extract variant columns into a custom schema in a Spark DataFrame. This config is a good tool to debug issues in a Parquet file
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May I ask why you think it that way? You told me that
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have added a new test So, if the If the config is disabled, which is the default, this read would give an error and you would need to read variant columns into a variant schema. |
||
|
|
||
| val PARQUET_FIELD_ID_READ_ENABLED = | ||
| buildConf("spark.sql.parquet.fieldId.read.enabled") | ||
| .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers " + | ||
|
|
@@ -5585,7 +5594,7 @@ object SQLConf { | |
| "When false, it only reads unshredded variant.") | ||
| .version("4.0.0") | ||
| .booleanConf | ||
| .createWithDefault(false) | ||
| .createWithDefault(true) | ||
|
|
||
| val PUSH_VARIANT_INTO_SCAN = | ||
| buildConf("spark.sql.variant.pushVariantIntoScan") | ||
|
|
@@ -7802,6 +7811,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { | |
|
|
||
| def parquetAnnotateVariantLogicalType: Boolean = getConf(PARQUET_ANNOTATE_VARIANT_LOGICAL_TYPE) | ||
|
|
||
| def parquetIgnoreVariantAnnotation: Boolean = getConf(SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION) | ||
|
|
||
| def ignoreMissingParquetFieldId: Boolean = getConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID) | ||
|
|
||
| def legacyParquetNanosAsLong: Boolean = getConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,15 +58,18 @@ class ParquetToSparkSchemaConverter( | |
| caseSensitive: Boolean = SQLConf.CASE_SENSITIVE.defaultValue.get, | ||
| inferTimestampNTZ: Boolean = SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.defaultValue.get, | ||
| nanosAsLong: Boolean = SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValue.get, | ||
| useFieldId: Boolean = SQLConf.PARQUET_FIELD_ID_READ_ENABLED.defaultValue.get) { | ||
| useFieldId: Boolean = SQLConf.PARQUET_FIELD_ID_READ_ENABLED.defaultValue.get, | ||
| val ignoreVariantAnnotation: Boolean = | ||
| SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.defaultValue.get) { | ||
|
|
||
| def this(conf: SQLConf) = this( | ||
| assumeBinaryIsString = conf.isParquetBinaryAsString, | ||
| assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp, | ||
| caseSensitive = conf.caseSensitiveAnalysis, | ||
| inferTimestampNTZ = conf.parquetInferTimestampNTZEnabled, | ||
| nanosAsLong = conf.legacyParquetNanosAsLong, | ||
| useFieldId = conf.parquetFieldIdReadEnabled) | ||
| useFieldId = conf.parquetFieldIdReadEnabled, | ||
| ignoreVariantAnnotation = conf.parquetIgnoreVariantAnnotation) | ||
|
|
||
| def this(conf: Configuration) = this( | ||
| assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean, | ||
|
|
@@ -75,7 +78,9 @@ class ParquetToSparkSchemaConverter( | |
| inferTimestampNTZ = conf.get(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean, | ||
| nanosAsLong = conf.get(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key).toBoolean, | ||
| useFieldId = conf.getBoolean(SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key, | ||
| SQLConf.PARQUET_FIELD_ID_READ_ENABLED.defaultValue.get)) | ||
| SQLConf.PARQUET_FIELD_ID_READ_ENABLED.defaultValue.get), | ||
| ignoreVariantAnnotation = conf.getBoolean(SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.key, | ||
| SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.defaultValue.get)) | ||
|
|
||
| /** | ||
| * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]]. | ||
|
|
@@ -202,15 +207,17 @@ class ParquetToSparkSchemaConverter( | |
| case primitiveColumn: PrimitiveColumnIO => convertPrimitiveField(primitiveColumn, targetType) | ||
| case groupColumn: GroupColumnIO if targetType.contains(VariantType) => | ||
| if (SQLConf.get.getConf(SQLConf.VARIANT_ALLOW_READING_SHREDDED)) { | ||
| val col = convertGroupField(groupColumn) | ||
| // We need the underlying file type regardless of the config. | ||
| val col = convertGroupField(groupColumn, ignoreVariantAnnotation = true) | ||
| col.copy(sparkType = VariantType, variantFileType = Some(col)) | ||
| } else { | ||
| convertVariantField(groupColumn) | ||
| } | ||
| case groupColumn: GroupColumnIO if targetType.exists(VariantMetadata.isVariantStruct) => | ||
| val col = convertGroupField(groupColumn) | ||
| val col = convertGroupField(groupColumn, ignoreVariantAnnotation = true) | ||
| col.copy(sparkType = targetType.get, variantFileType = Some(col)) | ||
| case groupColumn: GroupColumnIO => convertGroupField(groupColumn, targetType) | ||
| case groupColumn: GroupColumnIO => | ||
| convertGroupField(groupColumn, ignoreVariantAnnotation, targetType) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -349,6 +356,7 @@ class ParquetToSparkSchemaConverter( | |
|
|
||
| private def convertGroupField( | ||
| groupColumn: GroupColumnIO, | ||
| ignoreVariantAnnotation: Boolean, | ||
| sparkReadType: Option[DataType] = None): ParquetColumn = { | ||
| val field = groupColumn.getType.asGroupType() | ||
|
|
||
|
|
@@ -373,9 +381,21 @@ class ParquetToSparkSchemaConverter( | |
|
|
||
| Option(field.getLogicalTypeAnnotation).fold( | ||
| convertInternal(groupColumn, sparkReadType.map(_.asInstanceOf[StructType]))) { | ||
| // Temporary workaround to read Shredded variant data | ||
| case v: VariantLogicalTypeAnnotation if v.getSpecVersion == 1 && sparkReadType.isEmpty => | ||
| convertInternal(groupColumn, None) | ||
| case v: VariantLogicalTypeAnnotation if v.getSpecVersion == 1 => | ||
| if (ignoreVariantAnnotation) { | ||
| convertInternal(groupColumn) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand the reason why we need to maintain this logic for pure debugging purpose.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| } else { | ||
| ParquetSchemaConverter.checkConversionRequirement( | ||
| sparkReadType.forall(_.isInstanceOf[VariantType]), | ||
| s"Invalid Spark read type: expected $field to be variant type but found " + | ||
| s"${if (sparkReadType.isEmpty) { "None" } else {sparkReadType.get.sql} }") | ||
| if (SQLConf.get.getConf(SQLConf.VARIANT_ALLOW_READING_SHREDDED)) { | ||
| val col = convertInternal(groupColumn) | ||
| col.copy(sparkType = VariantType, variantFileType = Some(col)) | ||
| } else { | ||
| convertVariantField(groupColumn) | ||
| } | ||
| } | ||
|
|
||
| // A Parquet list is represented as a 3-level structure: | ||
| // | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we mark this conf as
.internal()? I think the main use case is to simplify debugging issues with the raw variant bytes, but let me know if there's a reason for this conf that I'm missing. Assuming my understanding is right, maybe we can also mention the intended use case in the doc comment.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1