Branch 3.0 [v3.0.2] [sql] [union all] One of the SQL returns (number of output rows: 0) Cause the whole sql to die#35081
Closed
Branch 3.0 [v3.0.2] [sql] [union all] One of the SQL returns (number of output rows: 0) Cause the whole sql to die#35081
Conversation
…iInstanceRelation.newInstance" This reverts commit 89443ab.
### What changes were proposed in this pull request?
When ruby version is 3.0, jekyll server will failed with
```
yi.zhu$ SKIP_API=1 jekyll serve --watch
Configuration file: /Users/yi.zhu/Documents/project/Angerszhuuuu/spark/docs/_config.yml
Source: /Users/yi.zhu/Documents/project/Angerszhuuuu/spark/docs
Destination: /Users/yi.zhu/Documents/project/Angerszhuuuu/spark/docs/_site
Incremental build: disabled. Enable with --incremental
Generating...
done in 5.085 seconds.
Auto-regeneration: enabled for '/Users/yi.zhu/Documents/project/Angerszhuuuu/spark/docs'
------------------------------------------------
Jekyll 4.2.0 Please append `--trace` to the `serve` command
for any additional information or backtrace.
------------------------------------------------
<internal:/usr/local/Cellar/ruby/3.0.0_1/lib/ruby/3.0.0/rubygems/core_ext/kernel_require.rb>:85:in `require': cannot load such file -- webrick (LoadError)
from <internal:/usr/local/Cellar/ruby/3.0.0_1/lib/ruby/3.0.0/rubygems/core_ext/kernel_require.rb>:85:in `require'
from /Users/yi.zhu/.gem/ruby/3.0.0/gems/jekyll-4.2.0/lib/jekyll/commands/serve/servlet.rb:3:in `<top (required)>'
from /Users/yi.zhu/.gem/ruby/3.0.0/gems/jekyll-4.2.0/lib/jekyll/commands/serve.rb:179:in `require_relative'
from /Users/yi.zhu/.gem/ruby/3.0.0/gems/jekyll-4.2.0/lib/jekyll/commands/serve.rb:179:in `setup'
from /Users/yi.zhu/.gem/ruby/3.0.0/gems/jekyll-4.2.0/lib/jekyll/commands/serve.rb:100:in `process'
from /Users/yi.zhu/.gem/ruby/3.0.0/gems/jekyll-4.2.0/lib/jekyll/command.rb:91:in `block in process_with_graceful_fail'
from /Users/yi.zhu/.gem/ruby/3.0.0/gems/jekyll-4.2.0/lib/jekyll/command.rb:91:in `each'
from /Users/yi.zhu/.gem/ruby/3.0.0/gems/jekyll-4.2.0/lib/jekyll/command.rb:91:in `process_with_graceful_fail'
from /Users/yi.zhu/.gem/ruby/3.0.0/gems/jekyll-4.2.0/lib/jekyll/commands/serve.rb:86:in `block (2 levels) in init_with_program'
from /Users/yi.zhu/.gem/ruby/3.0.0/gems/mercenary-0.4.0/lib/mercenary/command.rb:221:in `block in execute'
from /Users/yi.zhu/.gem/ruby/3.0.0/gems/mercenary-0.4.0/lib/mercenary/command.rb:221:in `each'
from /Users/yi.zhu/.gem/ruby/3.0.0/gems/mercenary-0.4.0/lib/mercenary/command.rb:221:in `execute'
from /Users/yi.zhu/.gem/ruby/3.0.0/gems/mercenary-0.4.0/lib/mercenary/program.rb:44:in `go'
from /Users/yi.zhu/.gem/ruby/3.0.0/gems/mercenary-0.4.0/lib/mercenary.rb:21:in `program'
from /Users/yi.zhu/.gem/ruby/3.0.0/gems/jekyll-4.2.0/exe/jekyll:15:in `<top (required)>'
from /usr/local/bin/jekyll:23:in `load'
from /usr/local/bin/jekyll:23:in `<main>'
```
This issue is solved in jekyll/jekyll#8523
### Why are the changes needed?
Fix build issue
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Not need
Closes #31263 from AngersZhuuuu/SPARK-34181.
Lead-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Co-authored-by: AngersZhuuuu <angers.zhu@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(cherry picked from commit faa4f0c)
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…te availability
This is a long-standing bug that exists since we have the ambiguous self-join check. A column reference is not ambiguous if it can only come from one join side (e.g. the other side has a project to only pick a few columns). An example is
```
Join(b#1 = 3)
TableScan(t, [a#0, b#1])
Project(a#2)
TableScan(t, [a#2, b#3])
```
It's a self-join, but `b#1` is not ambiguous because it can't come from the right side, which only has column `a`.
to not fail valid self-join queries.
yea as a bug fix
a new test
Closes #31287 from cloud-fan/self-join.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(cherry picked from commit b8a6906)
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…rom internal environment in HiveExternalCatalogVersionsSuite ### What changes were proposed in this pull request? `HiveExternalCatalogVersionsSuite` can't run in orgs internal environment where access to outside internet is not allowed because `HiveExternalCatalogVersionsSuite` will download spark release package from internet. Similar to SPARK-32998, this pr add 1 environment variables `SPARK_RELEASE_MIRROR` to let user can specify an accessible download address of spark release package and run `HiveExternalCatalogVersionsSuite` in orgs internal environment. ### Why are the changes needed? Let `HiveExternalCatalogVersionsSuite` can run in orgs internal environment without relying on external spark release download address. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass the Jenkins or GitHub Action - Manual test with and without env variables set in internal environment can't access internet. execute ``` mvn clean install -Dhadoop-3.2 -Phive-2.3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -PhPhive -pl sql/hive -am -DskipTests mvn clean install -Dhadoop-3.2 -Phive-2.3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -PhPhive -pl sql/hive -DwildcardSuites=org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite -Dtest=none ``` **Without env** ``` HiveExternalCatalogVersionsSuite: 19:50:35.123 WARN org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite: Failed to download Spark 3.0.1 from https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz: Network is unreachable (connect failed) 19:50:35.126 WARN org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite: Failed to download Spark 3.0.1 from https://dist.apache.org/repos/dist/release/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz: Network is unreachable (connect failed) org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite *** ABORTED *** Exception encountered when invoking run on a nested suite - Unable to download Spark 3.0.1 (HiveExternalCatalogVersionsSuite.scala:125) Run completed in 2 seconds, 669 milliseconds. Total number of tests run: 0 Suites: completed 1, aborted 1 Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0 ``` **With env** ``` export SPARK_RELEASE_MIRROR=${spark-release.internal.com}/dist/release/ ``` ``` HiveExternalCatalogVersionsSuite - backward compatibility Run completed in 1 minute, 32 seconds. Total number of tests run: 1 Suites: completed 2, aborted 0 Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` Closes #31294 from LuciferYang/SPARK-34202. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit e48a8ad) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
… SQL Server's spatial types ### What changes were proposed in this pull request? This PR backports SPARK-33813 (#31283). This PR fixes the issue that reading tables which contain spatial datatypes from MS SQL Server fails. MS SQL server supports two non-standard spatial JDBC types, `geometry` and `geography` but Spark SQL can't treat them ``` java.sql.SQLException: Unrecognized SQL type -157 at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getCatalystType(JdbcUtils.scala:251) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$getSchema$1(JdbcUtils.scala:321) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getSchema(JdbcUtils.scala:321) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:63) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226) at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:364) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:366) at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:355) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:355) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:240) at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:381) ``` Considering the [data type mapping](https://docs.microsoft.com/ja-jp/sql/connect/jdbc/using-basic-data-types?view=sql-server-ver15) says, I think those spatial types can be mapped to Catalyst's `BinaryType`. ### Why are the changes needed? To provide better support. ### Does this PR introduce _any_ user-facing change? Yes. MS SQL Server users can use `geometry` and `geography` types in datasource tables. ### How was this patch tested? New test case added to `MsSqlServerIntegrationSuite`. Closes #31290 from sarutak/SPARK-33813-branch-3.0. Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request? Invoke `CatalogImpl.refreshTable()` instead of `SessionCatalog.refreshTable` in v1 implementation of the `LOAD DATA` command. `SessionCatalog.refreshTable` just refreshes metadata comparing to `CatalogImpl.refreshTable()` which refreshes cached table data as well. ### Why are the changes needed? The example below portraits the issue: - Create a source table: ```sql spark-sql> CREATE TABLE src_tbl (c0 int, part int) USING hive PARTITIONED BY (part); spark-sql> INSERT INTO src_tbl PARTITION (part=0) SELECT 0; spark-sql> SHOW TABLE EXTENDED LIKE 'src_tbl' PARTITION (part=0); default src_tbl false Partition Values: [part=0] Location: file:/Users/maximgekk/proj/load-data-refresh-cache/spark-warehouse/src_tbl/part=0 ... ``` - Load data from the source table to a cached destination table: ```sql spark-sql> CREATE TABLE dst_tbl (c0 int, part int) USING hive PARTITIONED BY (part); spark-sql> INSERT INTO dst_tbl PARTITION (part=1) SELECT 1; spark-sql> CACHE TABLE dst_tbl; spark-sql> SELECT * FROM dst_tbl; 1 1 spark-sql> LOAD DATA LOCAL INPATH '/Users/maximgekk/proj/load-data-refresh-cache/spark-warehouse/src_tbl/part=0' INTO TABLE dst_tbl PARTITION (part=0); spark-sql> SELECT * FROM dst_tbl; 1 1 ``` The last query does not return new loaded data. ### Does this PR introduce _any_ user-facing change? Yes. After the changes, the example above works correctly: ```sql spark-sql> LOAD DATA LOCAL INPATH '/Users/maximgekk/proj/load-data-refresh-cache/spark-warehouse/src_tbl/part=0' INTO TABLE dst_tbl PARTITION (part=0); spark-sql> SELECT * FROM dst_tbl; 0 0 1 1 ``` ### How was this patch tested? Added new test to `org.apache.spark.sql.hive.CachedTableSuite`: ``` $ build/sbt -Phive -Phive-thriftserver "test:testOnly *CachedTableSuite" ``` Authored-by: Max Gekk <max.gekkgmail.com> Signed-off-by: Dongjoon Hyun <dhyunapple.com> (cherry picked from commit f8bf72e) Signed-off-by: Max Gekk <max.gekkgmail.com> Closes #31305 from MaxGekk/load-data-refresh-cache-3.0. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request? This PR aims to fix the Scala 2.12 release profile in `release-build.sh`. ### Why are the changes needed? Since 3.0.0 (SPARK-26132), the release script is using `SCALA_2_11_PROFILES` to publish Scala 2.12 artifacts. After looking at the code, this is not a blocker because `-Pscala-2.11` is no-op in `branch-3.x`. In addition `scala-2.12` profile is enabled by default and it's an empty profile without any configuration technically. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This is used by release manager only. Manually. This should land at `master/3.1/3.0`. Closes #31310 from dongjoon-hyun/SPARK-34217. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit b8fc6f8) Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit 2bca383) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
The `RowBasedKeyValueBatch` has two different implementations depending on whether the aggregation key and value uses only fixed length data types (`FixedLengthRowBasedKeyValueBatch`) or not (`VariableLengthRowBasedKeyValueBatch`).
Before this PR the decision about the used implementation was based on by accessing the schema fields by their name.
But if two fields has the same name and one with variable length and the other with fixed length type (and all the other fields are with fixed length types) a bad decision could be made.
When `FixedLengthRowBasedKeyValueBatch` is chosen but there is a variable length field then an aggregation function could calculate with invalid values. This case is illustrated by the example used in the unit test:
`with T as (select id as a, -id as x from range(3)),
U as (select id as b, cast(id as string) as x from range(3))
select T.x, U.x, min(a) as ma, min(b) as mb from T join U on a=b group by U.x, T.x`
where the 'x' column in the left side of the join is a Long but on the right side is a String.
### Why are the changes needed?
Fixes the issue where duplicate field name aggregation has null values in the dataframe.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added UT, tested manually on spark shell.
Closes #30788 from yliou/SPARK-33726.
Authored-by: yliou <yliou@berkeley.edu>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 512cacf)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ling when checking offset validation ### What changes were proposed in this pull request? This patch uses the available offset range obtained during polling Kafka records to do offset validation check. ### Why are the changes needed? We support non-consecutive offsets for Kafka since 2.4.0. In `fetchRecord`, we do offset validation by checking if the offset is in available offset range. But currently we obtain latest available offset range to do the check. It looks not correct as the available offset range could be changed during the batch, so the available offset range is different than the one when we polling the records from Kafka. It is possible that an offset is valid when polling, but at the time we do the above check, it is out of latest available offset range. We will wrongly consider it as data loss case and fail the query or drop the record. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This should pass existing unit tests. This is hard to have unit test as the Kafka producer and the consumer is asynchronous. Further, we also need to make the offset out of new available offset range. Closes #31328 from viirya/SPARK-34187-3.0. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…FAULT_PARTITION__` in v1 `In-Memory` catalog ### What changes were proposed in this pull request? In the PR, I propose to convert `null` partition values to `"__HIVE_DEFAULT_PARTITION__"` before storing in the `In-Memory` catalog internally. Currently, the `In-Memory` catalog maintains null partitions as `"__HIVE_DEFAULT_PARTITION__"` in file system but as `null` values in memory that could cause some issues like in SPARK-34203. ### Why are the changes needed? `InMemoryCatalog` stores partitions in the file system in the Hive compatible form, for instance, it converts the `null` partition value to `"__HIVE_DEFAULT_PARTITION__"` but at the same time it keeps null as is internally. That causes an issue demonstrated by the example below: ``` $ ./bin/spark-shell -c spark.sql.catalogImplementation=in-memory ``` ```scala scala> spark.conf.get("spark.sql.catalogImplementation") res0: String = in-memory scala> sql("CREATE TABLE tbl (col1 INT, p1 STRING) USING parquet PARTITIONED BY (p1)") res1: org.apache.spark.sql.DataFrame = [] scala> sql("INSERT OVERWRITE TABLE tbl VALUES (0, null)") res2: org.apache.spark.sql.DataFrame = [] scala> sql("ALTER TABLE tbl DROP PARTITION (p1 = null)") org.apache.spark.sql.catalyst.analysis.NoSuchPartitionsException: The following partitions not found in table 'tbl' database 'default': Map(p1 -> null) at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.dropPartitions(InMemoryCatalog.scala:440) ``` ### Does this PR introduce _any_ user-facing change? Yes. After the changes, `ALTER TABLE .. DROP PARTITION` can drop the `null` partition in `In-Memory` catalog: ```scala scala> spark.table("tbl").show(false) +----+----+ |col1|p1 | +----+----+ |0 |null| +----+----+ scala> sql("ALTER TABLE tbl DROP PARTITION (p1 = null)") res4: org.apache.spark.sql.DataFrame = [] scala> spark.table("tbl").show(false) +----+---+ |col1|p1 | +----+---+ +----+---+ ``` ### How was this patch tested? Added new test to `DDLSuite`: ``` $ build/sbt -Phive -Phive-thriftserver "test:testOnly *CatalogedDDLSuite" ``` Authored-by: Max Gekk <max.gekkgmail.com> Signed-off-by: Wenchen Fan <wenchendatabricks.com> (cherry picked from commit bfc0235) Signed-off-by: Max Gekk <max.gekkgmail.com> Closes #31326 from MaxGekk/insert-overwrite-null-part-3.0. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…toHadoopFsRelationCommand
with a simple case, the null will be passed to InsertIntoHadoopFsRelationCommand blindly, we should avoid the npe
```scala
test("NPE") {
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c string) USING $format PARTITIONED BY (c)")
sql("INSERT OVERWRITE t PARTITION (c=null) VALUES ('1')")
checkAnswer(spark.table("t"), Row("1", null))
}
}
```
```logtalk
java.lang.NullPointerException
at scala.collection.immutable.StringOps$.length(StringOps.scala:51)
at scala.collection.immutable.StringOps.length(StringOps.scala:51)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:35)
at scala.collection.IndexedSeqOptimized.foreach
at scala.collection.immutable.StringOps.foreach(StringOps.scala:33)
at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.escapePathName(ExternalCatalogUtils.scala:69)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.orig-s0.0000030000-r30676-expand-or-complete(InsertIntoHadoopFsRelationCommand.scala:231)
```
a bug fix
no
new tests
Closes #31320 from yaooqinn/SPARK-34223.
Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit b3915dd)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR aims to fix Avro data source to use the decimal precision and scale of file schema.
### Why are the changes needed?
The decimal value should be interpreted with its original precision and scale. Otherwise, it returns incorrect result like the following. The schema mismatch happens when we use `userSpecifiedSchema` or there are multiple files with inconsistent schema or HiveMetastore schema is updated by the user.
```scala
scala> sql("SELECT 3.14 a").write.format("avro").save("/tmp/avro")
scala> spark.read.schema("a DECIMAL(4, 3)").format("avro").load("/tmp/avro").show
+-----+
| a|
+-----+
|0.314|
+-----+
```
### Does this PR introduce _any_ user-facing change?
Yes, this will return correct result.
### How was this patch tested?
Pass the CI with the newly added test case.
Closes #31329 from dongjoon-hyun/SPARK-34229.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(cherry picked from commit 7d09eac)
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…re all resource opened by `Source.fromXXX` are closed ### What changes were proposed in this pull request? Using a function like `.mkString` or `.getLines` directly on a `scala.io.Source` opened by `fromFile`, `fromURL`, `fromURI ` will leak the underlying file handle, this pr use the `Utils.tryWithResource` method wrap the `BufferedSource` to ensure these `BufferedSource` closed. ### Why are the changes needed? Avoid file handle leak. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass the Jenkins or GitHub Action Closes #31344 from LuciferYang/SPARK-34224-30. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This PR aims to the correctness issues during reading decimal values from Parquet files.
- For **MR** code path, `ParquetRowConverter` can read Parquet's decimal values with the original precision and scale written in the corresponding footer.
- For **Vectorized** code path, `VectorizedColumnReader` throws `SchemaColumnConvertNotSupportedException`.
Currently, Spark returns incorrect results when the Parquet file's decimal precision and scale are different from the Spark's schema. This happens when there is multiple files with different decimal schema or HiveMetastore has a new schema.
**BEFORE (Simplified example for correctness)**
```scala
scala> sql("SELECT 1.0 a").write.parquet("/tmp/decimal")
scala> spark.read.schema("a DECIMAL(3,2)").parquet("/tmp/decimal").show
+----+
| a|
+----+
|0.10|
+----+
```
This works correctly in the other data sources, `ORC/JSON/CSV`, like the following.
```scala
scala> sql("SELECT 1.0 a").write.orc("/tmp/decimal_orc")
scala> spark.read.schema("a DECIMAL(3,2)").orc("/tmp/decimal_orc").show
+----+
| a|
+----+
|1.00|
+----+
```
**AFTER**
1. **Vectorized** path: Instead of incorrect result, we will raise an explicit exception.
```scala
scala> spark.read.schema("a DECIMAL(3,2)").parquet("/tmp/decimal").show
java.lang.UnsupportedOperationException: Schema evolution not supported.
```
2. **MR** path (complex schema or explicit configuration): Spark returns correct results.
```scala
scala> spark.read.schema("a DECIMAL(3,2), b DECIMAL(18, 3), c MAP<INT,INT>").parquet("/tmp/decimal").show
+----+-------+--------+
| a| b| c|
+----+-------+--------+
|1.00|100.000|{1 -> 2}|
+----+-------+--------+
scala> spark.read.schema("a DECIMAL(3,2), b DECIMAL(18, 3), c MAP<INT,INT>").parquet("/tmp/decimal").printSchema
root
|-- a: decimal(3,2) (nullable = true)
|-- b: decimal(18,3) (nullable = true)
|-- c: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = true)
```
Yes. This fixes the correctness issue.
Pass with the newly added test case.
Closes #31319 from dongjoon-hyun/SPARK-34212.
Lead-authored-by: Dongjoon Hyun <dhyun@apple.com>
Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit dbf051c)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…oSuite test case Change `AvroSuite."Ignore corrupt Avro file if flag IGNORE_CORRUPT_FILES"` to use `episodesAvro`, which is loaded as a resource using the classloader, instead of trying to read `episodes.avro` directly from a relative file path. This is the proper way to read resource files, and currently this test will fail when called from my IntelliJ IDE, though it will succeed when called from Maven/sbt, presumably due to different working directory handling. No, unit test only. Previous failure from IntelliJ: ``` Source 'src/test/resources/episodes.avro' does not exist java.io.FileNotFoundException: Source 'src/test/resources/episodes.avro' does not exist at org.apache.commons.io.FileUtils.checkFileRequirements(FileUtils.java:1405) at org.apache.commons.io.FileUtils.copyFile(FileUtils.java:1072) at org.apache.commons.io.FileUtils.copyFile(FileUtils.java:1040) at org.apache.spark.sql.avro.AvroSuite.$anonfun$new$34(AvroSuite.scala:397) at org.apache.spark.sql.avro.AvroSuite.$anonfun$new$34$adapted(AvroSuite.scala:388) ``` Now it succeeds. Closes #31332 from xkrogen/xkrogen-SPARK-34231-avrosuite-testfix. Authored-by: Erik Krogen <xkrogen@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org> (cherry picked from commit b2c104b) Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…on-decimal fields as decimal ### What changes were proposed in this pull request? This is a followup of #31319 . When reading parquet int/long as decimal, the behavior should be the same as reading int/long and then cast to the decimal type. This PR changes to the expected behavior. When reading parquet binary as decimal, we don't really know how to interpret the binary (it may from a string), and should fail. This PR changes to the expected behavior. ### Why are the changes needed? To make the behavior more sane. ### Does this PR introduce _any_ user-facing change? Yes, but it's a followup. ### How was this patch tested? updated test Closes #31357 from cloud-fan/bug. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit 2dbb7d5) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…esponding error message can be displayed correctly
### What changes were proposed in this pull request?
Ensure that if a stage fails in the UI page, the corresponding error message can be displayed correctly.
### Why are the changes needed?
errormessage is not handled properly in JavaScript. If the 'at' is not exist, the error message on the page will be blank.
I made wochanges,
1. `msg.indexOf("at")` => `msg.indexOf("\n")`

As shows ablove, truncated at the 'at' position will result in a strange abstract of the error message. If there is a `\n` worit is more reasonable to truncate at the '\n' position.
2. If the `\n` does not exist check whether the msg is more than 100. If true, then truncate the display to avoid too long error message
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual test shows as belows, just a js change:
before modified:

after modified

Closes #31314 from akiyamaneko/error_message_display_empty.
Authored-by: neko <echohlne@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit f1bc37e)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
… a timeout ### What changes were proposed in this pull request? This PR extends the `handle large number of containers and tasks (SPARK-18750)` test with a time limit and in case of timeout it saves the stack trace of the running thread to provide extra information about the reason why it got stuck. ### Why are the changes needed? This is a flaky test which sometime runs for hours without stopping. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I checked it with a temporary code change: by adding a `Thread.sleep` to `LocalityPreferredContainerPlacementStrategy#expectedHostToContainerCount`. The stack trace showed the correct method: ``` [info] LocalityPlacementStrategySuite: [info] - handle large number of containers and tasks (SPARK-18750) *** FAILED *** (30 seconds, 26 milliseconds) [info] Failed with an exception or a timeout at thread join: [info] [info] java.lang.RuntimeException: Timeout at waiting for thread to stop (its stack trace is added to the exception) [info] at java.lang.Thread.sleep(Native Method) [info] at org.apache.spark.deploy.yarn.LocalityPreferredContainerPlacementStrategy.$anonfun$expectedHostToContainerCount$1(LocalityPreferredContainerPlacementStrategy.scala:198) [info] at org.apache.spark.deploy.yarn.LocalityPreferredContainerPlacementStrategy$$Lambda$281/381161906.apply(Unknown Source) [info] at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) [info] at scala.collection.TraversableLike$$Lambda$16/322836221.apply(Unknown Source) [info] at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234) [info] at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:468) [info] at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:468) [info] at scala.collection.TraversableLike.map(TraversableLike.scala:238) [info] at scala.collection.TraversableLike.map$(TraversableLike.scala:231) [info] at scala.collection.AbstractTraversable.map(Traversable.scala:108) [info] at org.apache.spark.deploy.yarn.LocalityPreferredContainerPlacementStrategy.expectedHostToContainerCount(LocalityPreferredContainerPlacementStrategy.scala:188) [info] at org.apache.spark.deploy.yarn.LocalityPreferredContainerPlacementStrategy.localityOfRequestedContainers(LocalityPreferredContainerPlacementStrategy.scala:112) [info] at org.apache.spark.deploy.yarn.LocalityPlacementStrategySuite.org$apache$spark$deploy$yarn$LocalityPlacementStrategySuite$$runTest(LocalityPlacementStrategySuite.scala:94) [info] at org.apache.spark.deploy.yarn.LocalityPlacementStrategySuite$$anon$1.run(LocalityPlacementStrategySuite.scala:40) [info] at java.lang.Thread.run(Thread.java:748) (LocalityPlacementStrategySuite.scala:61) ... ``` Closes #31363 from attilapiros/SPARK-34154. Authored-by: “attilapiros” <piros.attila.zsolt@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> (cherry picked from commit 0dedf24) Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…unction
### What changes were proposed in this pull request?
This pr correct the documentation of the `concat_ws` function.
### Why are the changes needed?
`concat_ws` doesn't need any str or array(str) arguments:
```
scala> sql("""select concat_ws("s")""").show
+------------+
|concat_ws(s)|
+------------+
| |
+------------+
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
```
build/sbt "sql/testOnly *.ExpressionInfoSuite"
```
Closes #31370 from wangyum/SPARK-34268.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(cherry picked from commit 01d11da)
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…ew twice ### What changes were proposed in this pull request? In PR #30140, it will compare new and old plans when replacing view and uncache data if the view has changed. But the compared new plan is not analyzed which will cause `UnresolvedException` when calling `sameResult`. So in this PR, we use the analyzed plan to compare to fix this problem. ### Why are the changes needed? bug fix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? newly added tests Closes #31365 from linhongliu-db/SPARK-34260-3.0. Lead-authored-by: Linhong Liu <linhong.liu@databricks.com> Co-authored-by: Linhong Liu <67896261+linhongliu-db@users.noreply.github.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…LE .. SET LOCATION` ### What changes were proposed in this pull request? Invoke `CatalogImpl.refreshTable()` in v1 implementation of the `ALTER TABLE .. SET LOCATION` command to refresh cached table data. ### Why are the changes needed? The example below portraits the issue: - Create a source table: ```sql spark-sql> CREATE TABLE src_tbl (c0 int, part int) USING hive PARTITIONED BY (part); spark-sql> INSERT INTO src_tbl PARTITION (part=0) SELECT 0; spark-sql> SHOW TABLE EXTENDED LIKE 'src_tbl' PARTITION (part=0); default src_tbl false Partition Values: [part=0] Location: file:/Users/maximgekk/proj/refresh-cache-set-location/spark-warehouse/src_tbl/part=0 ... ``` - Set new location for the empty partition (part=0): ```sql spark-sql> CREATE TABLE dst_tbl (c0 int, part int) USING hive PARTITIONED BY (part); spark-sql> ALTER TABLE dst_tbl ADD PARTITION (part=0); spark-sql> INSERT INTO dst_tbl PARTITION (part=1) SELECT 1; spark-sql> CACHE TABLE dst_tbl; spark-sql> SELECT * FROM dst_tbl; 1 1 spark-sql> ALTER TABLE dst_tbl PARTITION (part=0) SET LOCATION '/Users/maximgekk/proj/refresh-cache-set-location/spark-warehouse/src_tbl/part=0'; spark-sql> SELECT * FROM dst_tbl; 1 1 ``` The last query does not return new loaded data. ### Does this PR introduce _any_ user-facing change? Yes. After the changes, the example above works correctly: ```sql spark-sql> ALTER TABLE dst_tbl PARTITION (part=0) SET LOCATION '/Users/maximgekk/proj/refresh-cache-set-location/spark-warehouse/src_tbl/part=0'; spark-sql> SELECT * FROM dst_tbl; 0 0 1 1 ``` ### How was this patch tested? Added new test to `org.apache.spark.sql.hive.CachedTableSuite`: ``` $ build/sbt -Phive -Phive-thriftserver "test:testOnly *CachedTableSuite" ``` Authored-by: Max Gekk <max.gekkgmail.com> Signed-off-by: HyukjinKwon <gurwls223apache.org> (cherry picked from commit d242166) Signed-off-by: Max Gekk <max.gekkgmail.com> Closes #31380 from MaxGekk/refresh-cache-set-location-3.0. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request? Use `count` to simplify `find + size(or length)` operation, it's semantically consistent, but looks simpler. **Before** ``` seq.filter(p).size ``` **After** ``` seq.count(p) ``` ### Why are the changes needed? Code Simpilefications. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass the Jenkins or GitHub Action Closes #31375 from LuciferYang/SPARK-34275-30. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…is stopped ### What changes were proposed in this pull request? This PR aims to prevent `HeartbeatReceiver` asks `Executor` to re-register blocker manager when the SparkContext is already stopped. ### Why are the changes needed? Currently, `HeartbeatReceiver` blindly asks re-registration for the new heartbeat message. However, when SparkContext is stopped, we don't need to re-register new block manager. Re-registration causes unnecessary executors' logs and and a delay on job termination. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs with the newly added test case. Closes #31373 from dongjoon-hyun/SPARK-34273. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit bc41c5a) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…hen generating SQL queries ### What changes were proposed in this pull request? When generating SQL queries only the old date time API types are handled for values in org.apache.spark.sql.jdbc.JdbcDialect#compileValue. If the new API is used (spark.sql.datetime.java8API.enabled=true) Instant and LocalDate values are not quoted and errors are thrown. The change proposed is to handle Instant and LocalDate values the same way that Timestamp and Date are. NOTE: This backport PR comes from #31148. ### Why are the changes needed? In the current state if an Instant is used in a filter, an exception will be thrown. Ex (dataset was read from PostgreSQL): dataset.filter(current_timestamp().gt(col(VALID_FROM))) Stacktrace (the T11 is from an instant formatted like yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'): Caused by: org.postgresql.util.PSQLException: ERROR: syntax error at or near "T11"Caused by: org.postgresql.util.PSQLException: ERROR: syntax error at or near "T11" Position: 285 at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2103) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1836) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:257) at org.postgresql.jdbc2.AbstractJdbc2Statement.execute(AbstractJdbc2Statement.java:512) at org.postgresql.jdbc2.AbstractJdbc2Statement.executeWithFlags(AbstractJdbc2Statement.java:388) at org.postgresql.jdbc2.AbstractJdbc2Statement.executeQuery(AbstractJdbc2Statement.java:273) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:304) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Test added Closes #31381 from cristichircu/SPARK-33867_3_0. Authored-by: Cristi Chircu <cristian.chircu@gmail.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
…te and Instant values to a table ### What changes were proposed in this pull request? When writing rows to a table only the old date time API types are handled in org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils#makeSetter. If the new API is used (spark.sql.datetime.java8API.enabled=true) casting Instant and LocalDate to Timestamp and Date respectively fails. The proposed change is to handle Instant and LocalDate values and transform them to Timestamp and Date. ### Why are the changes needed? In the current state writing Instant or LocalDate values to a table fails with something like: Caused by: java.lang.ClassCastException: class java.time.LocalDate cannot be cast to class java.sql.Date (java.time.LocalDate is in module java.base of loader 'bootstrap'; java.sql.Date is in module java.sql of loader 'platform') at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeSetter$11(JdbcUtils.scala:573) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeSetter$11$adapted(JdbcUtils.scala:572) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:678) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:858) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:856) at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:994) at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:994) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) ... 3 more ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No Closes #31382 from cristichircu/SPARK-34144_3_0. Lead-authored-by: Cristi Chircu <chircu@arezzosky.com> Co-authored-by: Cristi Chircu <cristian.chircu@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…data key 'org.apache.spark.legacyDateTime' ### What changes were proposed in this pull request? 1. Test both date and timestamp column types 2. Write the timestamp as the `TIMESTAMP_MICROS` logical type 3. Change the timestamp value to `'1000-01-01 01:02:03'` to check exception throwing. ### Why are the changes needed? To improve test coverage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By running the modified test suite: ``` $ build/sbt "testOnly org.apache.spark.sql.execution.datasources.parquet.ParquetIOSuite" ``` Closes #31396 from MaxGekk/parquet-test-metakey-followup. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> (cherry picked from commit 588ddcd) Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…e test ### What changes were proposed in this pull request? Fixing the flaky `handle large number of containers and tasks (SPARK-18750)` by avoiding to use `DNSToSwitchMapping` as in some situation DNS lookup could be extremely slow. ### Why are the changes needed? After #31363 was merged the flaky `handle large number of containers and tasks (SPARK-18750)` test failed again in some other PRs but now we have the exact place where the test is stuck. It is in the DNS lookup: ``` [info] - handle large number of containers and tasks (SPARK-18750) *** FAILED *** (30 seconds, 4 milliseconds) [info] Failed with an exception or a timeout at thread join: [info] [info] java.lang.RuntimeException: Timeout at waiting for thread to stop (its stack trace is added to the exception) [info] at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) [info] at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929) [info] at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324) [info] at java.net.InetAddress.getAllByName0(InetAddress.java:1277) [info] at java.net.InetAddress.getAllByName(InetAddress.java:1193) [info] at java.net.InetAddress.getAllByName(InetAddress.java:1127) [info] at java.net.InetAddress.getByName(InetAddress.java:1077) [info] at org.apache.hadoop.net.NetUtils.normalizeHostName(NetUtils.java:568) [info] at org.apache.hadoop.net.NetUtils.normalizeHostNames(NetUtils.java:585) [info] at org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:109) [info] at org.apache.spark.deploy.yarn.SparkRackResolver.coreResolve(SparkRackResolver.scala:75) [info] at org.apache.spark.deploy.yarn.SparkRackResolver.resolve(SparkRackResolver.scala:66) [info] at org.apache.spark.deploy.yarn.LocalityPreferredContainerPlacementStrategy.$anonfun$localityOfRequestedContainers$3(LocalityPreferredContainerPlacementStrategy.scala:142) [info] at org.apache.spark.deploy.yarn.LocalityPreferredContainerPlacementStrategy$$Lambda$658/1080992036.apply$mcVI$sp(Unknown Source) [info] at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) [info] at org.apache.spark.deploy.yarn.LocalityPreferredContainerPlacementStrategy.localityOfRequestedContainers(LocalityPreferredContainerPlacementStrategy.scala:138) [info] at org.apache.spark.deploy.yarn.LocalityPlacementStrategySuite.org$apache$spark$deploy$yarn$LocalityPlacementStrategySuite$$runTest(LocalityPlacementStrategySuite.scala:94) [info] at org.apache.spark.deploy.yarn.LocalityPlacementStrategySuite$$anon$1.run(LocalityPlacementStrategySuite.scala:40) [info] at java.lang.Thread.run(Thread.java:748) (LocalityPlacementStrategySuite.scala:61) ... ``` This could be because of the DNS servers used by those build machines are not configured to handle IPv6 queries and the client has to wait for the IPv6 query to timeout before falling back to IPv4. This even make the tests more consistent. As when a single host was given to lookup via `resolve(hostName: String)` it gave a different answer from calling `resolve(hostNames: Seq[String])` with a `Seq` containing that single host. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit tests. Closes #31397 from attilapiros/SPARK-34154-2nd. Authored-by: “attilapiros” <piros.attila.zsolt@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> (cherry picked from commit d3f049c) Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…StoreCustomMetric This patch proposes to sum up custom metric values instead of taking arbitrary one when combining `StateStoreMetrics`. For stateful join in structured streaming, we need to combine `StateStoreMetrics` from both left and right side. Currently we simply take arbitrary one from custom metrics with same name from left and right. By doing this we miss half of metric number. Yes, this corrects metrics collected for stateful join. Unit test. Closes #31369 from viirya/SPARK-34270. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit 50d14c9) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request? Replaces `collection.map(f1).flatten(f2)` with `collection.flatMap` if possible. it's semantically consistent, but looks simpler. ### Why are the changes needed? Code Simpilefications. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass the Jenkins or GitHub Action Closes #31418 from LuciferYang/SPARK-34310-30. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…Pandas/MapInPandas
### What changes were proposed in this pull request?
Resolve duplicate attributes for `FlatMapCoGroupsInPandas`.
### Why are the changes needed?
When performing self-join on top of `FlatMapCoGroupsInPandas`, analysis can fail because of conflicting attributes. For example,
```scala
df = spark.createDataFrame([(1, 1)], ("column", "value"))
row = df.groupby("ColUmn").cogroup(
df.groupby("COLUMN")
).applyInPandas(lambda r, l: r + l, "column long, value long")
row.join(row).show()
```
error:
```scala
...
Conflicting attributes: column#163321L,value#163322L
;;
’Join Inner
:- FlatMapCoGroupsInPandas [ColUmn#163312L], [COLUMN#163312L], <lambda>(column#163312L, value#163313L, column#163312L, value#163313L), [column#163321L, value#163322L]
: :- Project [ColUmn#163312L, column#163312L, value#163313L]
: : +- LogicalRDD [column#163312L, value#163313L], false
: +- Project [COLUMN#163312L, column#163312L, value#163313L]
: +- LogicalRDD [column#163312L, value#163313L], false
+- FlatMapCoGroupsInPandas [ColUmn#163312L], [COLUMN#163312L], <lambda>(column#163312L, value#163313L, column#163312L, value#163313L), [column#163321L, value#163322L]
:- Project [ColUmn#163312L, column#163312L, value#163313L]
: +- LogicalRDD [column#163312L, value#163313L], false
+- Project [COLUMN#163312L, column#163312L, value#163313L]
+- LogicalRDD [column#163312L, value#163313L], false
...
```
### Does this PR introduce _any_ user-facing change?
yes, the query like the above example won't fail.
### How was this patch tested?
Adde unit tests.
Closes #31429 from Ngone51/fix-conflcting-attrs-of-FlatMapCoGroupsInPandas.
Lead-authored-by: yi.wu <yi.wu@databricks.com>
Co-authored-by: wuyi <yi.wu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(cherry picked from commit e9362c2)
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request? This PR fixes two minor problems in SpakR `examples` - Replace misplaced standard comment (`#`) with roxygen comment (`#'`) in `sparkR.session` `examples` - Add missing comma in `write.stream` examples. ### Why are the changes needed? - `sparkR.session` examples are not fully rendered. - `write.stream` example is not syntactically valid. ### Does this PR introduce _any_ user-facing change? Docs only. ### How was this patch tested? Manual inspection of build docs. Closes #34654 from zero323/sparkr-docs-fixes. Authored-by: zero323 <mszymkiewicz@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit 207dd4c) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
… UTs when using `hadoop-3.2` profile without `assembly/target/scala-%s/jars`
### What changes were proposed in this pull request?
`YarnShuffleIntegrationSuite`, `YarnShuffleAuthSuite` and `YarnShuffleAlternateNameConfigSuite` will failed when using `hadoop-3.2` profile without `assembly/target/scala-%s/jars`, the fail reason is `java.lang.NoClassDefFoundError: breeze/linalg/Matrix`.
The above UTS can succeed when using `hadoop-2.7` profile without `assembly/target/scala-%s/jars` because `KryoSerializer.loadableSparkClasses` can workaroud when `Utils.isTesting` is true, but `Utils.isTesting` is false when using `hadoop-3.2` profile.
After investigated, I found that when `hadoop-2.7` profile is used, `SPARK_TESTING` will be propagated to AM and Executor, but when `hadoop-3.2` profile is used, `SPARK_TESTING` will not be propagated to AM and Executor.
In order to ensure the consistent behavior of using `hadoop-2.7` and ``hadoop-3.2``, this pr change to manually propagate `SPARK_TESTING` environment variable if it exists to ensure `Utils.isTesting` is true in above test scenario.
### Why are the changes needed?
Ensure `YarnShuffleIntegrationSuite` releated UTs can succeed when using `hadoop-3.2` profile without `assembly/target/scala-%s/jars`
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Pass the Jenkins or GitHub Action
- Manual test `YarnShuffleIntegrationSuite`. `YarnShuffleAuthSuite` and `YarnShuffleAlternateNameConfigSuite` can be verified in the same way.
Please ensure that the `assembly/target/scala-%s/jars` directory does not exist before executing the test command, we can clean up the whole project by executing follow command or clone a new local code repo.
1. run with `hadoop-3.2` profile
```
mvn clean install -Phadoop-3.2 -Pyarn -Dtest=none -DwildcardSuites=org.apache.spark.deploy.yarn.YarnShuffleIntegrationSuite
```
**Before**
```
YarnShuffleIntegrationSuite:
- external shuffle service *** FAILED ***
FAILED did not equal FINISHED (stdout/stderr was not captured) (BaseYarnClusterSuite.scala:227)
Run completed in 48 seconds, 137 milliseconds.
Total number of tests run: 1
Suites: completed 2, aborted 0
Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
*** 1 TEST FAILED ***
```
Error stack as follows:
```
21/11/20 23:00:09.682 main ERROR Client: Application diagnostics message: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times,
most recent failure: Lost task 0.3 in stage 0.0 (TID 6) (localhost executor 1): java.lang.NoClassDefFoundError: breeze/linalg/Matrix
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:216)
at org.apache.spark.serializer.KryoSerializer$.$anonfun$loadableSparkClasses$1(KryoSerializer.scala:537)
at scala.collection.immutable.List.flatMap(List.scala:366)
at org.apache.spark.serializer.KryoSerializer$.loadableSparkClasses$lzycompute(KryoSerializer.scala:535)
at org.apache.spark.serializer.KryoSerializer$.org$apache$spark$serializer$KryoSerializer$$loadableSparkClasses(KryoSerializer.scala:502)
at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:226)
at org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:102)
at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48)
at org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:109)
at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:346)
at org.apache.spark.serializer.KryoSerializationStream.<init>(KryoSerializer.scala:266)
at org.apache.spark.serializer.KryoSerializerInstance.serializeStream(KryoSerializer.scala:432)
at org.apache.spark.shuffle.ShufflePartitionPairsWriter.open(ShufflePartitionPairsWriter.scala:76)
at org.apache.spark.shuffle.ShufflePartitionPairsWriter.write(ShufflePartitionPairsWriter.scala:59)
at org.apache.spark.util.collection.WritablePartitionedIterator.writeNext(WritablePartitionedPairCollection.scala:83)
at org.apache.spark.util.collection.ExternalSorter.$anonfun$writePartitionedMapOutput$1(ExternalSorter.scala:772)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468)
at org.apache.spark.util.collection.ExternalSorter.writePartitionedMapOutput(ExternalSorter.scala:775)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: breeze.linalg.Matrix
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
... 32 more
```
**After**
```
YarnShuffleIntegrationSuite:
- external shuffle service
Run completed in 35 seconds, 188 milliseconds.
Total number of tests run: 1
Suites: completed 2, aborted 0
Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```
2. run with `hadoop-2.7` profile
```
mvn clean install -Phadoop-2.7 -Pyarn -Dtest=none -DwildcardSuites=org.apache.spark.deploy.yarn.YarnShuffleIntegrationSuite
```
**Before**
```
YarnShuffleIntegrationSuite:
- external shuffle service
Run completed in 30 seconds, 828 milliseconds.
Total number of tests run: 1
Suites: completed 2, aborted 0
Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```
**After**
```
YarnShuffleIntegrationSuite:
- external shuffle service
Run completed in 30 seconds, 967 milliseconds.
Total number of tests run: 1
Suites: completed 2, aborted 0
Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```
Closes #34620 from LuciferYang/SPARK-37209.
Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
(cherry picked from commit a7b3fc7)
Signed-off-by: Sean Owen <srowen@gmail.com>
…rtition pruning ### What changes were proposed in this pull request? Drop all tables after testing dynamic partition pruning. ### Why are the changes needed? We should drop all tables after testing dynamic partition pruning. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Exist unittests Closes #34768 from weixiuli/SPARK-11150-fix. Authored-by: weixiuli <weixiuli@jd.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit 2433c94) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
**What changes were proposed in this pull request?** Change the deserialization mapping for primitive type void. **Why are the changes needed?** The void primitive type in Scala should be classOf[Unit] not classOf[Void]. Spark erroneously [map it](https://github.com/apache/spark/blob/v3.2.0/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala#L80) differently than all other primitive types. Here is the code: ``` private object JavaDeserializationStream { val primitiveMappings = Map[String, Class[_]]( "boolean" -> classOf[Boolean], "byte" -> classOf[Byte], "char" -> classOf[Char], "short" -> classOf[Short], "int" -> classOf[Int], "long" -> classOf[Long], "float" -> classOf[Float], "double" -> classOf[Double], "void" -> classOf[Void] ) } ``` Spark code is Here is the demonstration: ``` scala> classOf[Long] val res0: Class[Long] = long scala> classOf[Double] val res1: Class[Double] = double scala> classOf[Byte] val res2: Class[Byte] = byte scala> classOf[Void] val res3: Class[Void] = class java.lang.Void <--- this is wrong scala> classOf[Unit] val res4: Class[Unit] = void <---- this is right ``` It will result in Spark deserialization error if the Spark code contains void primitive type: `java.io.InvalidClassException: java.lang.Void; local class name incompatible with stream class name "void"` **Does this PR introduce any user-facing change?** no **How was this patch tested?** Changed test, also tested e2e with the code results deserialization error and it pass now. Closes #34816 from daijyc/voidtype. Authored-by: Daniel Dai <jdai@pinterest.com> Signed-off-by: Sean Owen <srowen@gmail.com> (cherry picked from commit fb40c0e) Signed-off-by: Sean Owen <srowen@gmail.com>
… lastProgress This small PR fixes incorrect documentation in Structured Streaming Guide where Python's `recentProgress` & `lastProgress` where shown as functions although they are [properties](https://github.com/apache/spark/blob/master/python/pyspark/sql/streaming.py#L117), so if they are called as functions it generates error: ``` >>> query.lastProgress() Traceback (most recent call last): File "<stdin>", line 1, in <module> TypeError: 'dict' object is not callable ``` The documentation was erroneous, and needs to be fixed to avoid confusion by readers yes, it's a fix of the documentation Not necessary Closes #34947 from alexott/fix-python-recent-progress-docs. Authored-by: Alex Ott <alexott@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit bad96e6) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…(de)serialization This PR proposes to add a driver-side check on `supportsColumnar` sanity check at `ColumnarToRowExec`. SPARK-23731 fixed the plans to be serializable by leveraging lazy but SPARK-28213 happened to refer to the lazy variable at: https://github.com/apache/spark/blob/77b164aac9764049a4820064421ef82ec0bc14fb/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L68 This can fail during canonicalization during, for example, eliminating sub common expressions (on executor side): ``` java.lang.NullPointerException at org.apache.spark.sql.execution.FileSourceScanExec.supportsColumnar$lzycompute(DataSourceScanExec.scala:280) at org.apache.spark.sql.execution.FileSourceScanExec.supportsColumnar(DataSourceScanExec.scala:279) at org.apache.spark.sql.execution.InputAdapter.supportsColumnar(WholeStageCodegenExec.scala:509) at org.apache.spark.sql.execution.ColumnarToRowExec.<init>(Columnar.scala:67) ... at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:581) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:580) at org.apache.spark.sql.execution.ScalarSubquery.canonicalized$lzycompute(subquery.scala:110) ... at org.apache.spark.sql.catalyst.expressions.ExpressionEquals.hashCode(EquivalentExpressions.scala:275) ... at scala.collection.mutable.HashTable.findEntry$(HashTable.scala:135) at scala.collection.mutable.HashMap.findEntry(HashMap.scala:44) at scala.collection.mutable.HashMap.get(HashMap.scala:74) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:46) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTreeHelper$1(EquivalentExpressions.scala:147) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:170) at org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime.$anonfun$proxyExpressions$1(SubExprEvaluationRuntime.scala:89) at org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime.$anonfun$proxyExpressions$1$adapted(SubExprEvaluationRuntime.scala:89) at scala.collection.immutable.List.foreach(List.scala:392) ``` This fix is still a bandaid fix but at least addresses the issue with minimized change - this fix should ideally be backported too. Pretty unlikely - when `ColumnarToRowExec` has to be canonicalized on the executor side (see the stacktrace), but yes. it would fix a bug. Unittest was added. Closes #35058 from HyukjinKwon/SPARK-37779. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit 195f1aa) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Author
|
It can be determined that the first SQL has no results returned. |
Member
|
Please file an issue in JIRA: https://issues.apache.org/jira/projects/SPARK |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
As the title indicates.
I encountered this problem.
Spark2.4.7 can be successfully executed and the result data can be successfully obtained.
I am very confused
Hope to get answers or ideas from the community.
The following is the SQL statement (Some fields are decorated.):
select h_code ,sum(box_num) box_num
from
(
select h_code,box_num
from dm.stock_info
where dt='2021-12-23' and code in ('100') and box_num>0
union all
SELECT h_code,sum(field1) as box_num
from dw.order_item1 where dt='2021-12-23'
and is_finish =0 and is_valid=1 and to_date(order_time)>='2021-12-07'
and code='100' and box_num>0
GROUP by h_code
union ALL
select h_code,sum(field1) as box_num
from dw.order_item2
where dt='2021-12-23' and is_valid=1 and is_pay=1 and is_rk=0 and is_close=0 and is_delete=0 and to_date(pay_date)>='2021-12-07'
and code='100'
group by h_code
)aa
group by h_code