From c0ed91f064e2375a30d12e92373e39ab117c9033 Mon Sep 17 00:00:00 2001 From: Petar Vasiljevic Date: Wed, 23 Oct 2024 18:19:12 +0200 Subject: [PATCH 1/9] initial commit --- .../jdbc/v2/PostgresIntegrationSuite.scala | 38 ++++++++++++++++++- .../apache/spark/sql/internal/SQLConf.scala | 13 +++++++ .../spark/sql/jdbc/PostgresDialect.scala | 7 +++- 3 files changed, 56 insertions(+), 2 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index f70b500f974a4..54642665c5c40 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -23,10 +23,10 @@ import org.apache.spark.{SparkConf, SparkSQLException} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.jdbc.DatabaseOnDocker import org.apache.spark.sql.types._ import org.apache.spark.tags.DockerTest - /** * To run this test suite for a specific version (e.g., postgres:17.0-alpine) * {{{ @@ -125,6 +125,13 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT connection.prepareStatement( "CREATE TABLE datetime (name VARCHAR(32), date1 DATE, time1 TIMESTAMP)") .executeUpdate() + + connection.prepareStatement("CREATE TABLE array_of_array_of_int (col int[][])") + .executeUpdate() + connection.prepareStatement("INSERT INTO array_of_array_of_int " + + "VALUES (array[array[1],array[2]])").executeUpdate() + connection.prepareStatement("CREATE TABLE ctas_array_of_array_of_int " + + "AS SELECT * FROM array_of_array_of_int").executeUpdate() } test("Test multi-dimensional column types") { @@ -302,4 +309,33 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT assert(rows10(0).getString(0) === "amy") assert(rows10(1).getString(0) === "alex") } + + test("Test reading 2d array from table created via CTAS command - negative test") { + withSQLConf(SQLConf.POSTGRES_PROPER_READING_OF_ARRAY_DIMENSIONALITY.key -> "false") { + val exception = intercept[org.apache.spark.SparkException] { + sql(s"SELECT * FROM $catalogName.ctas_array_of_array_of_int").collect() + }.getMessage + + assert(exception.contains("""Bad value for type int : {{1},{2}}""")) + } + } + + test("Test reading 2d array from table created via CTAS command - positive test") { + val rowsWithOldBehaviour = + withSQLConf( + SQLConf.POSTGRES_PROPER_READING_OF_ARRAY_DIMENSIONALITY.key -> "false" + ) { + sql(s"SELECT * FROM $catalogName.array_of_array_of_int").collect() + } + + withSQLConf( + SQLConf.POSTGRES_PROPER_READING_OF_ARRAY_DIMENSIONALITY.key -> "true" + ) { + val dfWithNewBehaviour = sql(s"SELECT * FROM $catalogName.array_of_array_of_int") + val CTASdfWithNewBehaviour = sql(s"SELECT * FROM $catalogName.ctas_array_of_array_of_int") + + checkAnswer(dfWithNewBehaviour, rowsWithOldBehaviour) + checkAnswer(CTASdfWithNewBehaviour, rowsWithOldBehaviour) + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5218a683a8fa8..9f20bf7df23a9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4463,6 +4463,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val POSTGRES_PROPER_READING_OF_ARRAY_DIMENSIONALITY = + buildConf("spark.sql.legacy.postgres.properReadingOfArrayDimensionality") + .internal() + .doc("When true postgres will be asked to return the dimension of array column" + + "by calling array_ndims(col). When false, old way of calling pg_attribute.attndims" + + "will be done.") + .version("4.0.0") + .booleanConf + .createWithDefault(true) + val CSV_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.csv.filterPushdown.enabled") .doc("When true, enable filter pushdown to CSV datasource.") .version("3.0.0") @@ -5728,6 +5738,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def legacyPostgresDatetimeMappingEnabled: Boolean = getConf(LEGACY_POSTGRES_DATETIME_MAPPING_ENABLED) + def postgresProperReadingOfArrayDimensionality: Boolean = + getConf(POSTGRES_PROPER_READING_OF_ARRAY_DIMENSIONALITY) + override def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = { LegacyBehaviorPolicy.withName(getConf(SQLConf.LEGACY_TIME_PARSER_POLICY)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 1265550b3f19d..8b3ca6ed05a92 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -382,7 +382,11 @@ private case class PostgresDialect() case Types.ARRAY => val tableName = rsmd.getTableName(columnIdx) val columnName = rsmd.getColumnName(columnIdx) - val query = + val query = if (conf.postgresProperReadingOfArrayDimensionality) { + s""" + |SELECT min(array_ndims($columnName)) FROM $tableName + |""".stripMargin + } else { s""" |SELECT pg_attribute.attndims |FROM pg_attribute @@ -390,6 +394,7 @@ private case class PostgresDialect() | JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid |WHERE pg_class.relname = '$tableName' and pg_attribute.attname = '$columnName' |""".stripMargin + } try { Using.resource(conn.createStatement()) { stmt => Using.resource(stmt.executeQuery(query)) { rs => From 63e756d0847653b01f2752876014aa208a562edd Mon Sep 17 00:00:00 2001 From: Petar Vasiljevic Date: Wed, 23 Oct 2024 20:22:00 +0200 Subject: [PATCH 2/9] initial commit --- .../jdbc/v2/PostgresIntegrationSuite.scala | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index 54642665c5c40..57b365ac09e4f 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.jdbc.DatabaseOnDocker import org.apache.spark.sql.types._ import org.apache.spark.tags.DockerTest + /** * To run this test suite for a specific version (e.g., postgres:17.0-alpine) * {{{ @@ -107,21 +108,25 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT connection.prepareStatement("CREATE TABLE array_timestamptz (col timestamptz[])") .executeUpdate() - connection.prepareStatement("INSERT INTO array_int VALUES (array[array[10]])").executeUpdate() - connection.prepareStatement("INSERT INTO array_bigint VALUES (array[array[10]])") - .executeUpdate() - connection.prepareStatement("INSERT INTO array_smallint VALUES (array[array[10]])") + connection.prepareStatement("INSERT INTO array_int VALUES " + + "(array[array[10]]), (array[10])").executeUpdate() + connection.prepareStatement("INSERT INTO array_bigint VALUES (array[array[10]]), (array[10])") .executeUpdate() - connection.prepareStatement("INSERT INTO array_boolean VALUES (array[array[true]])") + connection.prepareStatement("INSERT INTO array_smallint VALUES (array[array[10]]), (array[10])") .executeUpdate() - connection.prepareStatement("INSERT INTO array_float VALUES (array[array[10.5]])") + connection.prepareStatement("INSERT INTO array_boolean VALUES " + + "(array[array[true]]), (array[true])") .executeUpdate() - connection.prepareStatement("INSERT INTO array_double VALUES (array[array[10.1]])") + connection.prepareStatement("INSERT INTO array_float VALUES " + + "(array[array[10.5]]), (array[10.5])").executeUpdate() + connection.prepareStatement("INSERT INTO array_double " + + "VALUES (array[array[10.1]]), (array[10.1])").executeUpdate() + connection.prepareStatement("INSERT INTO array_timestamp VALUES " + + "(array[array['2022-01-01 09:15'::timestamp]]), (array['2022-01-01 09:15'::timestamp])") .executeUpdate() - connection.prepareStatement("INSERT INTO array_timestamp VALUES (" + - "array[array['2022-01-01 09:15'::timestamp]])").executeUpdate() connection.prepareStatement("INSERT INTO array_timestamptz VALUES " + - "(array[array['2022-01-01 09:15'::timestamptz]])").executeUpdate() + "(array[array['2022-01-01 09:15'::timestamptz]]), (array['2022-01-01 09:15'::timestamptz])") + .executeUpdate() connection.prepareStatement( "CREATE TABLE datetime (name VARCHAR(32), date1 DATE, time1 TIMESTAMP)") .executeUpdate() From 600600b6b1d39b9ca557374043f12c8a52a72f51 Mon Sep 17 00:00:00 2001 From: Petar Vasiljevic Date: Thu, 24 Oct 2024 13:48:18 +0200 Subject: [PATCH 3/9] fix perf --- .../jdbc/v2/PostgresIntegrationSuite.scala | 43 +++++++++++++------ .../apache/spark/sql/internal/SQLConf.scala | 2 +- .../spark/sql/jdbc/PostgresDialect.scala | 6 +-- 3 files changed, 32 insertions(+), 19 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index 57b365ac09e4f..a67ca8a136526 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -108,25 +108,21 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT connection.prepareStatement("CREATE TABLE array_timestamptz (col timestamptz[])") .executeUpdate() - connection.prepareStatement("INSERT INTO array_int VALUES " + - "(array[array[10]]), (array[10])").executeUpdate() - connection.prepareStatement("INSERT INTO array_bigint VALUES (array[array[10]]), (array[10])") + connection.prepareStatement("INSERT INTO array_int VALUES (array[array[10]])").executeUpdate() + connection.prepareStatement("INSERT INTO array_bigint VALUES (array[array[10]])") .executeUpdate() - connection.prepareStatement("INSERT INTO array_smallint VALUES (array[array[10]]), (array[10])") + connection.prepareStatement("INSERT INTO array_smallint VALUES (array[array[10]])") .executeUpdate() - connection.prepareStatement("INSERT INTO array_boolean VALUES " + - "(array[array[true]]), (array[true])") + connection.prepareStatement("INSERT INTO array_boolean VALUES (array[array[true]])") .executeUpdate() - connection.prepareStatement("INSERT INTO array_float VALUES " + - "(array[array[10.5]]), (array[10.5])").executeUpdate() - connection.prepareStatement("INSERT INTO array_double " + - "VALUES (array[array[10.1]]), (array[10.1])").executeUpdate() - connection.prepareStatement("INSERT INTO array_timestamp VALUES " + - "(array[array['2022-01-01 09:15'::timestamp]]), (array['2022-01-01 09:15'::timestamp])") + connection.prepareStatement("INSERT INTO array_float VALUES (array[array[10.5]])") .executeUpdate() - connection.prepareStatement("INSERT INTO array_timestamptz VALUES " + - "(array[array['2022-01-01 09:15'::timestamptz]]), (array['2022-01-01 09:15'::timestamptz])") + connection.prepareStatement("INSERT INTO array_double VALUES (array[array[10.1]])") .executeUpdate() + connection.prepareStatement("INSERT INTO array_timestamp VALUES (" + + "array[array['2022-01-01 09:15'::timestamp]])").executeUpdate() + connection.prepareStatement("INSERT INTO array_timestamptz VALUES " + + "(array[array['2022-01-01 09:15'::timestamptz]])").executeUpdate() connection.prepareStatement( "CREATE TABLE datetime (name VARCHAR(32), date1 DATE, time1 TIMESTAMP)") .executeUpdate() @@ -137,6 +133,11 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT "VALUES (array[array[1],array[2]])").executeUpdate() connection.prepareStatement("CREATE TABLE ctas_array_of_array_of_int " + "AS SELECT * FROM array_of_array_of_int").executeUpdate() + + connection.prepareStatement("CREATE TABLE unsupported_array_of_array_of_int (col int[][])") + .executeUpdate() + connection.prepareStatement("INSERT INTO unsupported_array_of_array_of_int " + + "VALUES (array[array[1],array[2]]), (array[3])").executeUpdate() } test("Test multi-dimensional column types") { @@ -343,4 +344,18 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT checkAnswer(CTASdfWithNewBehaviour, rowsWithOldBehaviour) } } + + test("Test reading multiple dimension array from table created via CTAS command " + + "- negative test") { + withSQLConf(SQLConf.POSTGRES_PROPER_READING_OF_ARRAY_DIMENSIONALITY.key -> "true") { + val exception = intercept[org.apache.spark.SparkSQLException] { + sql(s"SELECT * FROM $catalogName.unsupported_array_of_array_of_int").collect() + }.getMessage + + // There is 2d and 1d array in unsupported_array_of_array_of_int. Reading this table + // is not supported + assert(exception.contains("Some values in field 0 are incompatible with the" + + " column array type. Expected type \"ARRAY>\"")) + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9f20bf7df23a9..d43e593b502b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4471,7 +4471,7 @@ object SQLConf { "will be done.") .version("4.0.0") .booleanConf - .createWithDefault(true) + .createWithDefault(false) val CSV_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.csv.filterPushdown.enabled") .doc("When true, enable filter pushdown to CSV datasource.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 8b3ca6ed05a92..acfe6db9ff6a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -383,9 +383,7 @@ private case class PostgresDialect() val tableName = rsmd.getTableName(columnIdx) val columnName = rsmd.getColumnName(columnIdx) val query = if (conf.postgresProperReadingOfArrayDimensionality) { - s""" - |SELECT min(array_ndims($columnName)) FROM $tableName - |""".stripMargin + s"SELECT array_ndims($columnName)) FROM $tableName LIMIT 1" } else { s""" |SELECT pg_attribute.attndims @@ -398,7 +396,7 @@ private case class PostgresDialect() try { Using.resource(conn.createStatement()) { stmt => Using.resource(stmt.executeQuery(query)) { rs => - if (rs.next()) metadata.putLong("arrayDimension", rs.getLong(1)) + if (rs.next()) metadata.putLong("arrayDimension", Math.max(rs.getLong(1), 1)) } } } catch { From 86d020125c75c4f1edfb547d61bf9f970fcf0fc6 Mon Sep 17 00:00:00 2001 From: Petar Vasiljevic Date: Thu, 24 Oct 2024 17:51:51 +0200 Subject: [PATCH 4/9] change flag name --- .../jdbc/v2/PostgresIntegrationSuite.scala | 85 ++++++++++--------- .../apache/spark/sql/internal/SQLConf.scala | 12 +-- .../spark/sql/jdbc/PostgresDialect.scala | 14 ++- 3 files changed, 61 insertions(+), 50 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index a67ca8a136526..bdf5eb70c68ad 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -141,40 +141,43 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT } test("Test multi-dimensional column types") { - // This test is used to verify that the multi-dimensional - // column types are supported by the JDBC V2 data source. - // We do not verify any result output - // - val df = spark.read.format("jdbc") - .option("url", jdbcUrl) - .option("dbtable", "array_test_table") - .load() - df.collect() - - val array_tables = Array( - ("array_int", "\"ARRAY\""), - ("array_bigint", "\"ARRAY\""), - ("array_smallint", "\"ARRAY\""), - ("array_boolean", "\"ARRAY\""), - ("array_float", "\"ARRAY\""), - ("array_double", "\"ARRAY\""), - ("array_timestamp", "\"ARRAY\""), - ("array_timestamptz", "\"ARRAY\"") - ) - - array_tables.foreach { case (dbtable, arrayType) => - checkError( - exception = intercept[SparkSQLException] { - val df = spark.read.format("jdbc") - .option("url", jdbcUrl) - .option("dbtable", dbtable) - .load() - df.collect() - }, - condition = "COLUMN_ARRAY_ELEMENT_TYPE_MISMATCH", - parameters = Map("pos" -> "0", "type" -> arrayType), - sqlState = Some("0A000") + withSQLConf(SQLConf.POSTGRES_GET_ARRAY_DIM_FROM_PG_ATTRIBUTE_METADATA_TABLE.key -> "true") { + + // This test is used to verify that the multi-dimensional + // column types are supported by the JDBC V2 data source. + // We do not verify any result output + // + val df = spark.read.format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "array_test_table") + .load() + df.collect() + + val array_tables = Array( + ("array_int", "\"ARRAY\""), + ("array_bigint", "\"ARRAY\""), + ("array_smallint", "\"ARRAY\""), + ("array_boolean", "\"ARRAY\""), + ("array_float", "\"ARRAY\""), + ("array_double", "\"ARRAY\""), + ("array_timestamp", "\"ARRAY\""), + ("array_timestamptz", "\"ARRAY\"") ) + + array_tables.foreach { case (dbtable, arrayType) => + checkError( + exception = intercept[SparkSQLException] { + val df = spark.read.format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", dbtable) + .load() + df.collect() + }, + condition = "COLUMN_ARRAY_ELEMENT_TYPE_MISMATCH", + parameters = Map("pos" -> "0", "type" -> arrayType), + sqlState = Some("0A000") + ) + } } } @@ -317,37 +320,37 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT } test("Test reading 2d array from table created via CTAS command - negative test") { - withSQLConf(SQLConf.POSTGRES_PROPER_READING_OF_ARRAY_DIMENSIONALITY.key -> "false") { - val exception = intercept[org.apache.spark.SparkException] { + withSQLConf(SQLConf.POSTGRES_GET_ARRAY_DIM_FROM_PG_ATTRIBUTE_METADATA_TABLE.key -> "true") { + val exception = intercept[org.apache.spark.SparkSQLException] { sql(s"SELECT * FROM $catalogName.ctas_array_of_array_of_int").collect() }.getMessage - assert(exception.contains("""Bad value for type int : {{1},{2}}""")) + assert(exception.contains("Some values in field 0 are incompatible " + + "with the column array type. Expected type \"ARRAY\"")) } } test("Test reading 2d array from table created via CTAS command - positive test") { val rowsWithOldBehaviour = withSQLConf( - SQLConf.POSTGRES_PROPER_READING_OF_ARRAY_DIMENSIONALITY.key -> "false" + SQLConf.POSTGRES_GET_ARRAY_DIM_FROM_PG_ATTRIBUTE_METADATA_TABLE.key -> "true" ) { sql(s"SELECT * FROM $catalogName.array_of_array_of_int").collect() } withSQLConf( - SQLConf.POSTGRES_PROPER_READING_OF_ARRAY_DIMENSIONALITY.key -> "true" + SQLConf.POSTGRES_GET_ARRAY_DIM_FROM_PG_ATTRIBUTE_METADATA_TABLE.key -> "false" ) { val dfWithNewBehaviour = sql(s"SELECT * FROM $catalogName.array_of_array_of_int") val CTASdfWithNewBehaviour = sql(s"SELECT * FROM $catalogName.ctas_array_of_array_of_int") - checkAnswer(dfWithNewBehaviour, rowsWithOldBehaviour) - checkAnswer(CTASdfWithNewBehaviour, rowsWithOldBehaviour) + checkAnswer(CTASdfWithNewBehaviour, dfWithNewBehaviour.collect()) } } test("Test reading multiple dimension array from table created via CTAS command " + "- negative test") { - withSQLConf(SQLConf.POSTGRES_PROPER_READING_OF_ARRAY_DIMENSIONALITY.key -> "true") { + withSQLConf(SQLConf.POSTGRES_GET_ARRAY_DIM_FROM_PG_ATTRIBUTE_METADATA_TABLE.key -> "false") { val exception = intercept[org.apache.spark.SparkSQLException] { sql(s"SELECT * FROM $catalogName.unsupported_array_of_array_of_int").collect() }.getMessage diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d43e593b502b8..4fbf612864145 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4463,12 +4463,12 @@ object SQLConf { .booleanConf .createWithDefault(false) - val POSTGRES_PROPER_READING_OF_ARRAY_DIMENSIONALITY = - buildConf("spark.sql.legacy.postgres.properReadingOfArrayDimensionality") + val POSTGRES_GET_ARRAY_DIM_FROM_PG_ATTRIBUTE_METADATA_TABLE = + buildConf("spark.sql.legacy.postgres.getArrayDimFromPgAttribute") .internal() .doc("When true postgres will be asked to return the dimension of array column" + - "by calling array_ndims(col). When false, old way of calling pg_attribute.attndims" + - "will be done.") + "by querying the pg_attribute table. When false, function array_ndims will be called on" + + "first record of the table to get the dimension of array column") .version("4.0.0") .booleanConf .createWithDefault(false) @@ -5738,8 +5738,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def legacyPostgresDatetimeMappingEnabled: Boolean = getConf(LEGACY_POSTGRES_DATETIME_MAPPING_ENABLED) - def postgresProperReadingOfArrayDimensionality: Boolean = - getConf(POSTGRES_PROPER_READING_OF_ARRAY_DIMENSIONALITY) + def getArrayDimFromPgAttribute: Boolean = + getConf(POSTGRES_GET_ARRAY_DIM_FROM_PG_ATTRIBUTE_METADATA_TABLE) override def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = { LegacyBehaviorPolicy.withName(getConf(SQLConf.LEGACY_TIME_PARSER_POLICY)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index acfe6db9ff6a7..0014ec0330735 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -382,9 +382,15 @@ private case class PostgresDialect() case Types.ARRAY => val tableName = rsmd.getTableName(columnIdx) val columnName = rsmd.getColumnName(columnIdx) - val query = if (conf.postgresProperReadingOfArrayDimensionality) { - s"SELECT array_ndims($columnName)) FROM $tableName LIMIT 1" - } else { + val query = if (conf.getArrayDimFromPgAttribute) { + /* + We send the query to Pg to read from pg_attribute table. There is one caveat here: + If table is created with CTAS command on Pg side, the information about dimensionality + of array column will be lost and attndims will be 0. Therefore, for example, + array will be mapped to int. If this is the case, it can be resolved by setting + spark.sql.legacy.postgres.getArrayDimFromPgAttribute flag to false. + */ + s""" |SELECT pg_attribute.attndims |FROM pg_attribute @@ -392,6 +398,8 @@ private case class PostgresDialect() | JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid |WHERE pg_class.relname = '$tableName' and pg_attribute.attname = '$columnName' |""".stripMargin + } else { + s"SELECT array_ndims($columnName) FROM $tableName LIMIT 1" } try { Using.resource(conn.createStatement()) { stmt => From 7ff3d6705750c473ce3f4252979a0cdb6a164a7c Mon Sep 17 00:00:00 2001 From: Petar Vasiljevic Date: Fri, 25 Oct 2024 13:56:56 +0200 Subject: [PATCH 5/9] remove config --- .../jdbc/v2/PostgresIntegrationSuite.scala | 146 +++++++----------- .../apache/spark/sql/internal/SQLConf.scala | 13 -- .../spark/sql/jdbc/PostgresDialect.scala | 20 +-- 3 files changed, 61 insertions(+), 118 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index bdf5eb70c68ad..3890fbeef409b 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -23,7 +23,6 @@ import org.apache.spark.{SparkConf, SparkSQLException} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.jdbc.DatabaseOnDocker import org.apache.spark.sql.types._ import org.apache.spark.tags.DockerTest @@ -108,20 +107,23 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT connection.prepareStatement("CREATE TABLE array_timestamptz (col timestamptz[])") .executeUpdate() - connection.prepareStatement("INSERT INTO array_int VALUES (array[array[10]])").executeUpdate() - connection.prepareStatement("INSERT INTO array_bigint VALUES (array[array[10]])") + connection.prepareStatement("INSERT INTO array_int VALUES (array[10]), (array[array[10]])") .executeUpdate() - connection.prepareStatement("INSERT INTO array_smallint VALUES (array[array[10]])") - .executeUpdate() - connection.prepareStatement("INSERT INTO array_boolean VALUES (array[array[true]])") - .executeUpdate() - connection.prepareStatement("INSERT INTO array_float VALUES (array[array[10.5]])") - .executeUpdate() - connection.prepareStatement("INSERT INTO array_double VALUES (array[array[10.1]])") - .executeUpdate() - connection.prepareStatement("INSERT INTO array_timestamp VALUES (" + - "array[array['2022-01-01 09:15'::timestamp]])").executeUpdate() + connection.prepareStatement("INSERT INTO array_bigint VALUES (array[10]), " + + "(array[array[10]])").executeUpdate() + connection.prepareStatement("INSERT INTO array_smallint VALUES (array[10]), " + + "(array[array[10]])").executeUpdate() + connection.prepareStatement("INSERT INTO array_boolean VALUES (array[true]), " + + "(array[array[true]])").executeUpdate() + connection.prepareStatement("INSERT INTO array_float VALUES (array[10.5]), " + + "(array[array[10.5]])").executeUpdate() + connection.prepareStatement("INSERT INTO array_double VALUES (array[10.1]), " + + "(array[array[10.1]])").executeUpdate() + connection.prepareStatement("INSERT INTO array_timestamp VALUES " + + "(array['2022-01-01 09:15'::timestamp]), " + + "(array[array['2022-01-01 09:15'::timestamp]])").executeUpdate() connection.prepareStatement("INSERT INTO array_timestamptz VALUES " + + "(array['2022-01-01 09:15'::timestamptz]), " + "(array[array['2022-01-01 09:15'::timestamptz]])").executeUpdate() connection.prepareStatement( "CREATE TABLE datetime (name VARCHAR(32), date1 DATE, time1 TIMESTAMP)") @@ -141,43 +143,40 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT } test("Test multi-dimensional column types") { - withSQLConf(SQLConf.POSTGRES_GET_ARRAY_DIM_FROM_PG_ATTRIBUTE_METADATA_TABLE.key -> "true") { - - // This test is used to verify that the multi-dimensional - // column types are supported by the JDBC V2 data source. - // We do not verify any result output - // - val df = spark.read.format("jdbc") - .option("url", jdbcUrl) - .option("dbtable", "array_test_table") - .load() - df.collect() - - val array_tables = Array( - ("array_int", "\"ARRAY\""), - ("array_bigint", "\"ARRAY\""), - ("array_smallint", "\"ARRAY\""), - ("array_boolean", "\"ARRAY\""), - ("array_float", "\"ARRAY\""), - ("array_double", "\"ARRAY\""), - ("array_timestamp", "\"ARRAY\""), - ("array_timestamptz", "\"ARRAY\"") - ) + // This test is used to verify that the multi-dimensional + // column types are supported by the JDBC V2 data source. + // We do not verify any result output + // + val df = spark.read.format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "array_test_table") + .load() + df.collect() + + val array_tables = Array( + ("array_int", "\"ARRAY\""), + ("array_bigint", "\"ARRAY\""), + ("array_smallint", "\"ARRAY\""), + ("array_boolean", "\"ARRAY\""), + ("array_float", "\"ARRAY\""), + ("array_double", "\"ARRAY\""), + ("array_timestamp", "\"ARRAY\""), + ("array_timestamptz", "\"ARRAY\"") + ) - array_tables.foreach { case (dbtable, arrayType) => - checkError( - exception = intercept[SparkSQLException] { - val df = spark.read.format("jdbc") - .option("url", jdbcUrl) - .option("dbtable", dbtable) - .load() - df.collect() - }, - condition = "COLUMN_ARRAY_ELEMENT_TYPE_MISMATCH", - parameters = Map("pos" -> "0", "type" -> arrayType), - sqlState = Some("0A000") - ) - } + array_tables.foreach { case (dbtable, arrayType) => + checkError( + exception = intercept[SparkSQLException] { + val df = spark.read.format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", dbtable) + .load() + df.collect() + }, + condition = "COLUMN_ARRAY_ELEMENT_TYPE_MISMATCH", + parameters = Map("pos" -> "0", "type" -> arrayType), + sqlState = Some("0A000") + ) } } @@ -319,46 +318,21 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT assert(rows10(1).getString(0) === "alex") } - test("Test reading 2d array from table created via CTAS command - negative test") { - withSQLConf(SQLConf.POSTGRES_GET_ARRAY_DIM_FROM_PG_ATTRIBUTE_METADATA_TABLE.key -> "true") { - val exception = intercept[org.apache.spark.SparkSQLException] { - sql(s"SELECT * FROM $catalogName.ctas_array_of_array_of_int").collect() - }.getMessage + test("Test reading 2d array from table created via CTAS command") { + val dfWithNewBehaviour = sql(s"SELECT * FROM $catalogName.array_of_array_of_int") + val CTASdfWithNewBehaviour = sql(s"SELECT * FROM $catalogName.ctas_array_of_array_of_int") - assert(exception.contains("Some values in field 0 are incompatible " + - "with the column array type. Expected type \"ARRAY\"")) - } + checkAnswer(CTASdfWithNewBehaviour, dfWithNewBehaviour.collect()) } - test("Test reading 2d array from table created via CTAS command - positive test") { - val rowsWithOldBehaviour = - withSQLConf( - SQLConf.POSTGRES_GET_ARRAY_DIM_FROM_PG_ATTRIBUTE_METADATA_TABLE.key -> "true" - ) { - sql(s"SELECT * FROM $catalogName.array_of_array_of_int").collect() - } - - withSQLConf( - SQLConf.POSTGRES_GET_ARRAY_DIM_FROM_PG_ATTRIBUTE_METADATA_TABLE.key -> "false" - ) { - val dfWithNewBehaviour = sql(s"SELECT * FROM $catalogName.array_of_array_of_int") - val CTASdfWithNewBehaviour = sql(s"SELECT * FROM $catalogName.ctas_array_of_array_of_int") - - checkAnswer(CTASdfWithNewBehaviour, dfWithNewBehaviour.collect()) - } - } + test("Test reading multiple dimension array from table created via CTAS command") { + val exception = intercept[org.apache.spark.SparkSQLException] { + sql(s"SELECT * FROM $catalogName.unsupported_array_of_array_of_int").collect() + }.getMessage - test("Test reading multiple dimension array from table created via CTAS command " + - "- negative test") { - withSQLConf(SQLConf.POSTGRES_GET_ARRAY_DIM_FROM_PG_ATTRIBUTE_METADATA_TABLE.key -> "false") { - val exception = intercept[org.apache.spark.SparkSQLException] { - sql(s"SELECT * FROM $catalogName.unsupported_array_of_array_of_int").collect() - }.getMessage - - // There is 2d and 1d array in unsupported_array_of_array_of_int. Reading this table - // is not supported - assert(exception.contains("Some values in field 0 are incompatible with the" + - " column array type. Expected type \"ARRAY>\"")) - } + // There is 2d and 1d array in unsupported_array_of_array_of_int. Reading this table + // is not supported + assert(exception.contains("Some values in field 0 are incompatible with the" + + " column array type. Expected type \"ARRAY>\"")) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4fbf612864145..5218a683a8fa8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4463,16 +4463,6 @@ object SQLConf { .booleanConf .createWithDefault(false) - val POSTGRES_GET_ARRAY_DIM_FROM_PG_ATTRIBUTE_METADATA_TABLE = - buildConf("spark.sql.legacy.postgres.getArrayDimFromPgAttribute") - .internal() - .doc("When true postgres will be asked to return the dimension of array column" + - "by querying the pg_attribute table. When false, function array_ndims will be called on" + - "first record of the table to get the dimension of array column") - .version("4.0.0") - .booleanConf - .createWithDefault(false) - val CSV_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.csv.filterPushdown.enabled") .doc("When true, enable filter pushdown to CSV datasource.") .version("3.0.0") @@ -5738,9 +5728,6 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def legacyPostgresDatetimeMappingEnabled: Boolean = getConf(LEGACY_POSTGRES_DATETIME_MAPPING_ENABLED) - def getArrayDimFromPgAttribute: Boolean = - getConf(POSTGRES_GET_ARRAY_DIM_FROM_PG_ATTRIBUTE_METADATA_TABLE) - override def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = { LegacyBehaviorPolicy.withName(getConf(SQLConf.LEGACY_TIME_PARSER_POLICY)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 0014ec0330735..3c3e4bfcd0445 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -382,25 +382,7 @@ private case class PostgresDialect() case Types.ARRAY => val tableName = rsmd.getTableName(columnIdx) val columnName = rsmd.getColumnName(columnIdx) - val query = if (conf.getArrayDimFromPgAttribute) { - /* - We send the query to Pg to read from pg_attribute table. There is one caveat here: - If table is created with CTAS command on Pg side, the information about dimensionality - of array column will be lost and attndims will be 0. Therefore, for example, - array will be mapped to int. If this is the case, it can be resolved by setting - spark.sql.legacy.postgres.getArrayDimFromPgAttribute flag to false. - */ - - s""" - |SELECT pg_attribute.attndims - |FROM pg_attribute - | JOIN pg_class ON pg_attribute.attrelid = pg_class.oid - | JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid - |WHERE pg_class.relname = '$tableName' and pg_attribute.attname = '$columnName' - |""".stripMargin - } else { - s"SELECT array_ndims($columnName) FROM $tableName LIMIT 1" - } + val query = s"SELECT array_ndims($columnName) FROM $tableName LIMIT 1" try { Using.resource(conn.createStatement()) { stmt => Using.resource(stmt.executeQuery(query)) { rs => From e699710de55a5c7853c261cfe8351333f07c0a7e Mon Sep 17 00:00:00 2001 From: Petar Vasiljevic Date: Mon, 28 Oct 2024 15:13:16 +0100 Subject: [PATCH 6/9] revert --- .../scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 3c3e4bfcd0445..76cb42719d8da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -382,11 +382,17 @@ private case class PostgresDialect() case Types.ARRAY => val tableName = rsmd.getTableName(columnIdx) val columnName = rsmd.getColumnName(columnIdx) + /* + Spark does not support different dimensionality per row, therefore we retrieve the + dimensionality of any row from Postgres. This might fail later on as Postgres allows + different dimensions per row for arrays. + */ + val query = s"SELECT array_ndims($columnName) FROM $tableName LIMIT 1" try { Using.resource(conn.createStatement()) { stmt => Using.resource(stmt.executeQuery(query)) { rs => - if (rs.next()) metadata.putLong("arrayDimension", Math.max(rs.getLong(1), 1)) + if (rs.next()) metadata.putLong("arrayDimension", rs.getLong(1)) } } } catch { From 55dd1f68969d4f54daa52ef983aadc087f520459 Mon Sep 17 00:00:00 2001 From: Petar Vasiljevic Date: Tue, 29 Oct 2024 17:35:53 +0100 Subject: [PATCH 7/9] add fallback --- .../jdbc/v2/PostgresIntegrationSuite.scala | 15 ++++----- .../spark/sql/jdbc/PostgresDialect.scala | 32 +++++++++++++++++-- 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index 3890fbeef409b..baa859e95f1a2 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -326,13 +326,12 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT } test("Test reading multiple dimension array from table created via CTAS command") { - val exception = intercept[org.apache.spark.SparkSQLException] { - sql(s"SELECT * FROM $catalogName.unsupported_array_of_array_of_int").collect() - }.getMessage - - // There is 2d and 1d array in unsupported_array_of_array_of_int. Reading this table - // is not supported - assert(exception.contains("Some values in field 0 are incompatible with the" + - " column array type. Expected type \"ARRAY>\"")) + checkError( + exception = intercept[org.apache.spark.SparkSQLException] { + sql(s"SELECT * FROM $catalogName.unsupported_array_of_array_of_int").collect() + }, + condition = "COLUMN_ARRAY_ELEMENT_TYPE_MISMATCH", + parameters = Map("pos" -> "0", "type" -> "\"ARRAY>\""), + ) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 76cb42719d8da..cbd087a814269 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -382,17 +382,45 @@ private case class PostgresDialect() case Types.ARRAY => val tableName = rsmd.getTableName(columnIdx) val columnName = rsmd.getColumnName(columnIdx) + /* Spark does not support different dimensionality per row, therefore we retrieve the dimensionality of any row from Postgres. This might fail later on as Postgres allows different dimensions per row for arrays. */ - val query = s"SELECT array_ndims($columnName) FROM $tableName LIMIT 1" + var arrayDimensionalityResolveNeedsFallback = true + try { Using.resource(conn.createStatement()) { stmt => Using.resource(stmt.executeQuery(query)) { rs => - if (rs.next()) metadata.putLong("arrayDimension", rs.getLong(1)) + if (rs.next()) { + metadata.putLong("arrayDimension", rs.getLong(1)) + arrayDimensionalityResolveNeedsFallback = false + } + } + } + + if (arrayDimensionalityResolveNeedsFallback) { + /* + In case that table doesn't contain any rows, previous query won't resolve dimensionality. + Therefore, fallback to dimension resolution from metadata table. + */ + val fallbackQuery = + s""" + |SELECT pg_attribute.attndims + |FROM pg_attribute + | JOIN pg_class ON pg_attribute.attrelid = pg_class.oid + | JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid + |WHERE pg_class.relname = '$tableName' and pg_attribute.attname = '$columnName' + |""".stripMargin + + Using.resource(conn.createStatement()) { stmt => + Using.resource(stmt.executeQuery(fallbackQuery)) { rs => + if (rs.next()) { + metadata.putLong("arrayDimension", rs.getLong(1)) + } + } } } } catch { From f008d4192884aec564b5908a17b7b1ea394af444 Mon Sep 17 00:00:00 2001 From: Petar Vasiljevic Date: Wed, 30 Oct 2024 13:55:00 +0100 Subject: [PATCH 8/9] refactor --- .../jdbc/v2/PostgresIntegrationSuite.scala | 2 +- .../spark/sql/jdbc/PostgresDialect.scala | 45 +++++++++---------- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index baa859e95f1a2..1dff2b378a6bf 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -331,7 +331,7 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT sql(s"SELECT * FROM $catalogName.unsupported_array_of_array_of_int").collect() }, condition = "COLUMN_ARRAY_ELEMENT_TYPE_MISMATCH", - parameters = Map("pos" -> "0", "type" -> "\"ARRAY>\""), + parameters = Map("pos" -> "0", "type" -> "\"ARRAY>\"") ) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index cbd087a814269..c3a50b8d4ba32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -389,36 +389,33 @@ private case class PostgresDialect() different dimensions per row for arrays. */ val query = s"SELECT array_ndims($columnName) FROM $tableName LIMIT 1" - var arrayDimensionalityResolveNeedsFallback = true try { Using.resource(conn.createStatement()) { stmt => Using.resource(stmt.executeQuery(query)) { rs => if (rs.next()) { metadata.putLong("arrayDimension", rs.getLong(1)) - arrayDimensionalityResolveNeedsFallback = false - } - } - } - - if (arrayDimensionalityResolveNeedsFallback) { - /* - In case that table doesn't contain any rows, previous query won't resolve dimensionality. - Therefore, fallback to dimension resolution from metadata table. - */ - val fallbackQuery = - s""" - |SELECT pg_attribute.attndims - |FROM pg_attribute - | JOIN pg_class ON pg_attribute.attrelid = pg_class.oid - | JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid - |WHERE pg_class.relname = '$tableName' and pg_attribute.attname = '$columnName' - |""".stripMargin - - Using.resource(conn.createStatement()) { stmt => - Using.resource(stmt.executeQuery(fallbackQuery)) { rs => - if (rs.next()) { - metadata.putLong("arrayDimension", rs.getLong(1)) + } else { + /* + If previous query doesn't return any rows, we should fallback to querying the + Postgres metadata table. + */ + val fallbackQuery = + s""" + |SELECT pg_attribute.attndims + |FROM pg_attribute + | JOIN pg_class ON pg_attribute.attrelid = pg_class.oid + | JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid + |WHERE pg_class.relname = '$tableName' + |AND pg_attribute.attname = '$columnName' + |""".stripMargin + + Using.resource(conn.createStatement()) { stmt2 => + Using.resource(stmt2.executeQuery(fallbackQuery)) { rs2 => + if (rs2.next()) { + metadata.putLong("arrayDimension", rs2.getLong(1)) + } + } } } } From 5475b9f03b7b82f4f971a56b8048129563bf9933 Mon Sep 17 00:00:00 2001 From: Petar Vasiljevic Date: Tue, 12 Nov 2024 10:55:01 +0100 Subject: [PATCH 9/9] refactor --- .../jdbc/v2/PostgresIntegrationSuite.scala | 28 ++++++++++-- .../spark/sql/jdbc/PostgresDialect.scala | 44 +++++-------------- 2 files changed, 35 insertions(+), 37 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index 1dff2b378a6bf..83b665853c535 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -129,6 +129,13 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT "CREATE TABLE datetime (name VARCHAR(32), date1 DATE, time1 TIMESTAMP)") .executeUpdate() + connection.prepareStatement("CREATE TABLE array_of_int (col int[])") + .executeUpdate() + connection.prepareStatement("INSERT INTO array_of_int " + + "VALUES (array[1])").executeUpdate() + connection.prepareStatement("CREATE TABLE ctas_array_of_int " + + "AS SELECT * FROM array_of_int").executeUpdate() + connection.prepareStatement("CREATE TABLE array_of_array_of_int (col int[][])") .executeUpdate() connection.prepareStatement("INSERT INTO array_of_array_of_int " + @@ -318,11 +325,24 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT assert(rows10(1).getString(0) === "alex") } - test("Test reading 2d array from table created via CTAS command") { - val dfWithNewBehaviour = sql(s"SELECT * FROM $catalogName.array_of_array_of_int") - val CTASdfWithNewBehaviour = sql(s"SELECT * FROM $catalogName.ctas_array_of_array_of_int") + test("Test reading 2d array from table created via CTAS command - positive test") { + val dfNoCTASTable = sql(s"SELECT * FROM $catalogName.array_of_int") + val dfWithCTASTable = sql(s"SELECT * FROM $catalogName.ctas_array_of_int") + + checkAnswer(dfWithCTASTable, dfNoCTASTable.collect()) + } + + test("Test reading 2d array from table created via CTAS command - negative test") { + val dfNoCTASTable = sql(s"SELECT * FROM $catalogName.array_of_int") - checkAnswer(CTASdfWithNewBehaviour, dfWithNewBehaviour.collect()) + checkError( + exception = intercept[org.apache.spark.SparkSQLException] { + // This should fail as only 1D CTAS tables are supported + sql(s"SELECT * FROM $catalogName.ctas_array_of_array_of_int").collect() + }, + condition = "COLUMN_ARRAY_ELEMENT_TYPE_MISMATCH", + parameters = Map("pos" -> "0", "type" -> "\"ARRAY\"") + ) } test("Test reading multiple dimension array from table created via CTAS command") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index c3a50b8d4ba32..04a62298a49f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -382,42 +382,20 @@ private case class PostgresDialect() case Types.ARRAY => val tableName = rsmd.getTableName(columnIdx) val columnName = rsmd.getColumnName(columnIdx) - - /* - Spark does not support different dimensionality per row, therefore we retrieve the - dimensionality of any row from Postgres. This might fail later on as Postgres allows - different dimensions per row for arrays. - */ - val query = s"SELECT array_ndims($columnName) FROM $tableName LIMIT 1" - + val query = + s""" + |SELECT pg_attribute.attndims + |FROM pg_attribute + | JOIN pg_class ON pg_attribute.attrelid = pg_class.oid + | JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid + |WHERE pg_class.relname = '$tableName' and pg_attribute.attname = '$columnName' + |""".stripMargin try { Using.resource(conn.createStatement()) { stmt => Using.resource(stmt.executeQuery(query)) { rs => - if (rs.next()) { - metadata.putLong("arrayDimension", rs.getLong(1)) - } else { - /* - If previous query doesn't return any rows, we should fallback to querying the - Postgres metadata table. - */ - val fallbackQuery = - s""" - |SELECT pg_attribute.attndims - |FROM pg_attribute - | JOIN pg_class ON pg_attribute.attrelid = pg_class.oid - | JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid - |WHERE pg_class.relname = '$tableName' - |AND pg_attribute.attname = '$columnName' - |""".stripMargin - - Using.resource(conn.createStatement()) { stmt2 => - Using.resource(stmt2.executeQuery(fallbackQuery)) { rs2 => - if (rs2.next()) { - metadata.putLong("arrayDimension", rs2.getLong(1)) - } - } - } - } + // Metadata can return 0 for CTAS tables. For such tables, we are always reading + // them as 1D array + if (rs.next()) metadata.putLong("arrayDimension", Math.max(1L, rs.getLong(1))) } } } catch {