Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32511][SQL] Add dropFields method to Column class #29322

Closed
wants to merge 7 commits into from

Conversation

fqaiser94
Copy link
Contributor

@fqaiser94 fqaiser94 commented Jul 31, 2020

What changes were proposed in this pull request?

Added a new dropFields method to the Column class.
This method should allow users to drop a StructField in a StructType column (with similar semantics to the drop method on Dataset).

Why are the changes needed?

Often Spark users have to work with deeply nested data e.g. to fix a data quality issue with an existing StructField. To do this with the existing Spark APIs, users have to rebuild the entire struct column.

For example, let's say you have the following deeply nested data structure which has a data quality issue (5 is missing):

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val data = spark.createDataFrame(sc.parallelize(
      Seq(Row(Row(Row(1, 2, 3), Row(Row(4, null, 6), Row(7, 8, 9), Row(10, 11, 12)), Row(13, 14, 15))))),
      StructType(Seq(
        StructField("a", StructType(Seq(
          StructField("a", StructType(Seq(
            StructField("a", IntegerType),
            StructField("b", IntegerType),
            StructField("c", IntegerType)))),
          StructField("b", StructType(Seq(
            StructField("a", StructType(Seq(
              StructField("a", IntegerType),
              StructField("b", IntegerType),
              StructField("c", IntegerType)))),
            StructField("b", StructType(Seq(
              StructField("a", IntegerType),
              StructField("b", IntegerType),
              StructField("c", IntegerType)))), 
            StructField("c", StructType(Seq(
              StructField("a", IntegerType),
              StructField("b", IntegerType),
              StructField("c", IntegerType))))
          ))), 
          StructField("c", StructType(Seq(
            StructField("a", IntegerType),
            StructField("b", IntegerType),
            StructField("c", IntegerType))))
        )))))).cache

data.show(false)
+---------------------------------+                                             
|a                                |
+---------------------------------+
|[[1, 2, 3], [[4,, 6], [7, 8, 9]]]|
+---------------------------------+

Currently, to drop the missing value users would have to do something like this:

val result = data.withColumn("a", 
  struct(
    $"a.a", 
    struct(
      struct(
        $"a.b.a.a", 
        $"a.b.a.c"
      ).as("a"), 
      $"a.b.b", 
      $"a.b.c"
    ).as("b"), 
    $"a.c"
  ))

result.show(false)
+---------------------------------------------------------------+
|a                                                              |
+---------------------------------------------------------------+
|[[1, 2, 3], [[4, 6], [7, 8, 9], [10, 11, 12]], [13, 14, 15]]|
+---------------------------------------------------------------+

As you can see above, with the existing methods users must call the struct function and list all fields, including fields they don't want to change. This is not ideal as:

this leads to complex, fragile code that cannot survive schema evolution.
SPARK-16483

In contrast, with the method added in this PR, a user could simply do something like this to get the same result:

val result = data.withColumn("a", 'a.dropFields("b.a.b"))
result.show(false)
+---------------------------------------------------------------+
|a                                                              |
+---------------------------------------------------------------+
|[[1, 2, 3], [[4, 6], [7, 8, 9], [10, 11, 12]], [13, 14, 15]]|
+---------------------------------------------------------------+

This is the second of maybe 3 methods that could be added to the Column class to make it easier to manipulate nested data.
Other methods under discussion in SPARK-22231 include withFieldRenamed.
However, this should be added in a separate PR.

Does this PR introduce any user-facing change?

Only one minor change. If the user submits the following query:

df.withColumn("a", $"a".withField(null, null))

instead of throwing:

java.lang.IllegalArgumentException: requirement failed: fieldName cannot be null

it will now throw:

java.lang.IllegalArgumentException: requirement failed: col cannot be null

I don't believe its should be an issue to change this because:

  • neither message is incorrect
  • Spark 3.1.0 has yet to be released

but please feel free to correct me if I am wrong.

How was this patch tested?

New unit tests were added. Jenkins must pass them.

Related JIRAs:

More discussion on this topic can be found here:

@fqaiser94
Copy link
Contributor Author

cc @cloud-fan @dbtsai @viirya

@dbtsai
Copy link
Member

dbtsai commented Aug 2, 2020

Jenkins, test this please.

@dbtsai
Copy link
Member

dbtsai commented Aug 2, 2020

Jenkins, add to whitelist.

@SparkQA
Copy link

SparkQA commented Aug 2, 2020

Test build #126944 has finished for PR 29322 at commit 19587e8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@fqaiser94
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Aug 3, 2020

Test build #126947 has finished for PR 29322 at commit 19587e8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 4, 2020

Test build #127062 has finished for PR 29322 at commit 7342514.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class RegExpExtractAll(subject: Expression, regexp: Expression, idx: Expression)
  • .doc(\"The name of a class that implements \" +
  • trait CachedBatch
  • trait CachedBatchSerializer extends Serializable
  • trait SimpleMetricsCachedBatch extends CachedBatch
  • abstract class SimpleMetricsCachedBatchSerializer extends CachedBatchSerializer with Logging
  • case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition with CodegenSupport
  • case class ApplyColumnarRulesAndInsertTransitions(
  • class ColumnStatisticsSchema(a: Attribute) extends Serializable
  • class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable
  • case class DefaultCachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
  • class DefaultCachedBatchSerializer extends SimpleMetricsCachedBatchSerializer

@SparkQA
Copy link

SparkQA commented Aug 4, 2020

Test build #127063 has finished for PR 29322 at commit 948fc9c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@fqaiser94 fqaiser94 changed the base branch from master to branch-0.5 August 12, 2020 00:27
@fqaiser94 fqaiser94 changed the base branch from branch-0.5 to master August 12, 2020 00:27
@SparkQA
Copy link

SparkQA commented Aug 12, 2020

Test build #127351 has finished for PR 29322 at commit ad111ba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except a few minor comments

@fqaiser94
Copy link
Contributor Author

@cloud-fan thanks for your review! Also, could you remove the CORE R WEB UI labels from this PR please? They were added incorrectly by the bot when I merged with master.

@SparkQA
Copy link

SparkQA commented Aug 13, 2020

Test build #127390 has finished for PR 29322 at commit 2b0ac34.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@fqaiser94
Copy link
Contributor Author

retest this please

@cloud-fan
Copy link
Contributor

The last commit already passed jenkins, I'm merging it to master, thanks!

@cloud-fan cloud-fan closed this in 0c850c7 Aug 13, 2020
@SparkQA
Copy link

SparkQA commented Aug 13, 2020

Test build #127394 has finished for PR 29322 at commit 2b0ac34.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Hi, @cloud-fan . Could you update the Apache Jira issue, SPARK-32511, according to your revert, please?

@cloud-fan
Copy link
Contributor

reopened

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants