From 3dbab92491e617ee1549595219152bcaa2c3534e Mon Sep 17 00:00:00 2001 From: cashmand Date: Tue, 11 Jun 2024 13:58:39 -0400 Subject: [PATCH] Avoid storage amplification. --- .../variant/variantExpressions.scala | 8 +++- .../org/apache/spark/sql/VariantSuite.scala | 43 +++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala index 26ab2f08b6443..b80fb11b6813b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala @@ -322,7 +322,13 @@ case object VariantGet { } } - if (dataType == VariantType) return new VariantVal(v.getValue, v.getMetadata) + if (dataType == VariantType) { + // Build a new variant, in order to strip off any unnecessary metadata. + val builder = new VariantBuilder + builder.appendVariant(v) + val result = builder.result() + return new VariantVal(result.getValue, result.getMetadata) + } val variantType = v.getType if (variantType == Type.NULL) return null dataType match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala index caab98b6239a0..0c00676607dd4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala @@ -584,4 +584,47 @@ class VariantSuite extends QueryTest with SharedSparkSession with ExpressionEval checkEvaluation(castVariantExpr, sqlVariantExpr.eval()) } } + + test("variant_get size") { + val largeKey = "x" * 1000 + val df = Seq(s"""{ "$largeKey": {"a" : 1 }, + "b" : 2, + "c": [1,2,3,{"$largeKey": 4}] }""").toDF("json") + .selectExpr("parse_json(json) as v") + + // Check Variant with approximate bounds to avoid flakiness if we make minor format changes. + def checkSize(v: VariantVal, minMetadata: Long, maxMetadata: Long, + minValue: Long, maxValue: Long): Unit = { + val mSize = v.getMetadata.length + assert(mSize >= minMetadata) + assert(mSize <= maxMetadata) + val vSize = v.getValue.length + assert(vSize >= minValue) + assert(vSize <= maxValue) + } + + // The full Variant has large metadata (but only one copy of `largeKey`). + checkSize(df.selectExpr("variant_get(v, '$', 'variant')").collect()(0) + .getAs[VariantVal](0), 1000, 1050, 20, 40) + // Extracting Variant or a nested type containing Variant should strip out the large metadata. + checkSize(df.selectExpr("variant_get(v, '$.b', 'variant')").collect()(0) + .getAs[VariantVal](0), 2, 4, 2, 4) + // Behavior is the same without an explicit cast to Variant. + checkSize(df.selectExpr("variant_get(v, '$.b', 'variant')").collect()(0) + .getAs[VariantVal](0), 2, 4, 2, 4) + checkSize(df.selectExpr(s"variant_get(v, '$$.$largeKey', 'variant')").collect()(0) + .getAs[VariantVal](0), 5, 10, 5, 10) + checkSize(df.selectExpr(s"variant_get(v, '$$.$largeKey', 'struct')") + .collect()(0).getStruct(0).getAs[VariantVal](0), 2, 4, 2, 4) + // Only the array element that contains `largeKey` should be large. + checkSize(df.selectExpr("variant_get(v, '$.c', 'array')").collect()(0) + .getSeq[VariantVal](0)(0), 2, 4, 2, 4) + checkSize(df.selectExpr("variant_get(v, '$.c', 'array')").collect()(0) + .getSeq[VariantVal](0)(3), 1000, 1020, 5, 10) + // Cast to a nested type containing Variant should also remove metadata. + val structResult = df.selectExpr(s"cast(v as struct<$largeKey:variant,b:variant>)").collect()(0) + .getStruct(0) + checkSize(structResult.getAs[VariantVal](0), 5, 10, 5, 10) + checkSize(structResult.getAs[VariantVal](1), 2, 4, 2, 4) + } }