Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33844][SQL][3.0] InsertIntoHiveDir command should check col name too #31038

Closed
wants to merge 1 commit into from

Conversation

AngersZhuuuu
Copy link
Contributor

What changes were proposed in this pull request?

In hive-1.2.1, hive serde just split serdeConstants.LIST_COLUMNS and serdeConstants.LIST_COLUMN_TYPES use comma.

When we use spark 2.4 with UT

  test("insert overwrite directory with comma col name") {
    withTempDir { dir =>
      val path = dir.toURI.getPath

      val v1 =
        s"""
           | INSERT OVERWRITE DIRECTORY '${path}'
           | STORED AS TEXTFILE
           | SELECT 1 as a, 'c' as b, if(1 = 1, "true", "false")
         """.stripMargin

      sql(v1).explain(true)

      sql(v1).show()
    }
  }

failed with as below since column name contains , then column names and column types size not equal.

19:56:05.618 ERROR org.apache.spark.sql.execution.datasources.FileFormatWriter:  [ angerszhu ] Aborting job dd774f18-93fa-431f-9468-3534c7d8acda.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): org.apache.hadoop.hive.serde2.SerDeException: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 5 elements while columns.types has 3 elements!
	at org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.extractColumnInfo(LazySerDeParameters.java:145)
	at org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.<init>(LazySerDeParameters.java:85)
	at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:125)
	at org.apache.spark.sql.hive.execution.HiveOutputWriter.<init>(HiveFileFormat.scala:119)
	at org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:103)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:287)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:219)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:218)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$12.apply(Executor.scala:461)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:467)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

After hive-2.3 we will set COLUMN_NAME_DELIMITER to special char when col name cntains ',':
https://github.com/apache/hive/blob/6f4c35c9e904d226451c465effdc5bfd31d395a0/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java#L1180-L1188
https://github.com/apache/hive/blob/6f4c35c9e904d226451c465effdc5bfd31d395a0/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java#L1044-L1075

And in script transform, we parse column name to avoid this problem

private def parseAttrs(attrs: Seq[Expression]): (Seq[String], Seq[DataType]) = {
val columns = attrs.zipWithIndex.map(e => s"${e._1.prettyName}_${e._2}")
val columnTypes = attrs.map(_.dataType)
(columns, columnTypes)
}

So I think in InsertIntoHiveDirComman, we should do same thing too. And I have verified this method can make spark-2.4 work well.

Why are the changes needed?

More save use serde

Does this PR introduce any user-facing change?

No

How was this patch tested?

@SparkQA
Copy link

SparkQA commented Jan 5, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38263/

@SparkQA
Copy link

SparkQA commented Jan 5, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38263/

@AngersZhuuuu
Copy link
Contributor Author

FYI @cloud-fan

@SparkQA
Copy link

SparkQA commented Jan 5, 2021

Test build #133674 has finished for PR 31038 at commit e909b6e.

  • This patch fails from timeout after a configured wait of 500m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

GA passed, merging to 3.0, thanks!

@cloud-fan cloud-fan closed this Jan 5, 2021
cloud-fan pushed a commit that referenced this pull request Jan 5, 2021
…me too

### What changes were proposed in this pull request?

In hive-1.2.1, hive serde just split `serdeConstants.LIST_COLUMNS` and `serdeConstants.LIST_COLUMN_TYPES` use comma.

When we use spark 2.4 with UT
```
  test("insert overwrite directory with comma col name") {
    withTempDir { dir =>
      val path = dir.toURI.getPath

      val v1 =
        s"""
           | INSERT OVERWRITE DIRECTORY '${path}'
           | STORED AS TEXTFILE
           | SELECT 1 as a, 'c' as b, if(1 = 1, "true", "false")
         """.stripMargin

      sql(v1).explain(true)

      sql(v1).show()
    }
  }
```
failed with as below since column name contains `,` then column names and column types size not equal.
```
19:56:05.618 ERROR org.apache.spark.sql.execution.datasources.FileFormatWriter:  [ angerszhu ] Aborting job dd774f18-93fa-431f-9468-3534c7d8acda.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): org.apache.hadoop.hive.serde2.SerDeException: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 5 elements while columns.types has 3 elements!
	at org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.extractColumnInfo(LazySerDeParameters.java:145)
	at org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.<init>(LazySerDeParameters.java:85)
	at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:125)
	at org.apache.spark.sql.hive.execution.HiveOutputWriter.<init>(HiveFileFormat.scala:119)
	at org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:103)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:287)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:219)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:218)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$12.apply(Executor.scala:461)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:467)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

```

After hive-2.3 we will set COLUMN_NAME_DELIMITER to special char when col name cntains ',':
https://github.com/apache/hive/blob/6f4c35c9e904d226451c465effdc5bfd31d395a0/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java#L1180-L1188
https://github.com/apache/hive/blob/6f4c35c9e904d226451c465effdc5bfd31d395a0/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java#L1044-L1075

And in script transform, we parse column name  to avoid this problem
https://github.com/apache/spark/blob/554600c2af0dbc8979955807658fafef5dc66c08/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala#L257-L261

So I think in `InsertIntoHiveDirComman`, we should do same thing too. And I have verified this method can make spark-2.4 work well.

### Why are the changes needed?
More save use serde

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

Closes #31038 from AngersZhuuuu/SPARK-33844-3.0.

Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants