From 304d3d0bd5e57837bf14a457d12f5d248598d8b9 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Mon, 25 Jan 2016 19:08:00 +0900 Subject: [PATCH 1/3] Support ArrayType(DecimalType) in Postgre JDBC --- .../apache/spark/sql/jdbc/PostgresIntegrationSuite.scala | 8 +++++--- .../scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 1 + 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index 7d011be37067b..3090755b9cc8e 100644 --- a/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -41,10 +41,10 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { conn.setCatalog("foo") conn.prepareStatement("CREATE TABLE bar (c0 text, c1 integer, c2 double precision, c3 bigint, " + "c4 bit(1), c5 bit(10), c6 bytea, c7 boolean, c8 inet, c9 cidr, " - + "c10 integer[], c11 text[], c12 real[])").executeUpdate() + + "c10 integer[], c11 text[], c12 real[], c13 decimal(38,18)[])").executeUpdate() conn.prepareStatement("INSERT INTO bar VALUES ('hello', 42, 1.25, 123456789012345, B'0', " + "B'1000100101', E'\\\\xDEADBEEF', true, '172.16.0.42', '192.168.0.0/16', " - + """'{1, 2}', '{"a", null, "b"}', '{0.11, 0.22}')""").executeUpdate() + + """'{1, 2}', '{"a", null, "b"}', '{0.11, 0.22}', '{1.5, 3.25}')""").executeUpdate() } test("Type mapping for various types") { @@ -52,7 +52,7 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { val rows = df.collect() assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass) - assert(types.length == 13) + assert(types.length == 14) assert(classOf[String].isAssignableFrom(types(0))) assert(classOf[java.lang.Integer].isAssignableFrom(types(1))) assert(classOf[java.lang.Double].isAssignableFrom(types(2))) @@ -66,6 +66,7 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { assert(classOf[Seq[Int]].isAssignableFrom(types(10))) assert(classOf[Seq[String]].isAssignableFrom(types(11))) assert(classOf[Seq[Double]].isAssignableFrom(types(12))) + assert(classOf[Seq[java.math.BigDecimal]].isAssignableFrom(types(13))) assert(rows(0).getString(0).equals("hello")) assert(rows(0).getInt(1) == 42) assert(rows(0).getDouble(2) == 1.25) @@ -82,6 +83,7 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { assert(rows(0).getSeq(10) == Seq(1, 2)) assert(rows(0).getSeq(11) == Seq("a", null, "b")) assert(rows(0).getSeq(12).toSeq == Seq(0.11f, 0.22f)) + assert(rows(0).getSeq[java.math.BigDecimal](13).map(_.doubleValue).toSeq == Seq(1.5, 3.25)) } test("Basic write test") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 8d43966480a65..6bda91dcdade3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -62,6 +62,7 @@ private object PostgresDialect extends JdbcDialect { case BooleanType => Some(JdbcType("BOOLEAN", Types.BOOLEAN)) case FloatType => Some(JdbcType("FLOAT4", Types.FLOAT)) case DoubleType => Some(JdbcType("FLOAT8", Types.DOUBLE)) + case t: DecimalType => Option(JdbcType(s"DECIMAL", java.sql.Types.DECIMAL)) case ArrayType(et, _) if et.isInstanceOf[AtomicType] => getJDBCType(et).map(_.databaseTypeDefinition) .orElse(JdbcUtils.getCommonJDBCType(et).map(_.databaseTypeDefinition)) From 36261329702c16575000073654412ae16aef68e6 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Tue, 26 Jan 2016 00:25:15 +0900 Subject: [PATCH 2/3] Fix bad comparisons for BigDecimal types --- .../org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index 3090755b9cc8e..554abfa72949d 100644 --- a/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -83,7 +83,10 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { assert(rows(0).getSeq(10) == Seq(1, 2)) assert(rows(0).getSeq(11) == Seq("a", null, "b")) assert(rows(0).getSeq(12).toSeq == Seq(0.11f, 0.22f)) - assert(rows(0).getSeq[java.math.BigDecimal](13).map(_.doubleValue).toSeq == Seq(1.5, 3.25)) + val c13Expected = Seq(1.5, 3.25).map(new java.math.BigDecimal(_)) + assert(rows(0).getSeq[java.math.BigDecimal](13).zipWithIndex.forall { case (v, idx) => + v.compareTo(c13Expected(idx)) == 0 + }) } test("Basic write test") { From 52eaebea0cf2650ee1aff4c0eb2d7dfd706d655b Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Thu, 28 Jan 2016 13:50:33 +0900 Subject: [PATCH 3/3] Add tests in PostgresIntegrationSuite.scala --- .../sql/jdbc/PostgresIntegrationSuite.scala | 19 ++++++++++--------- .../spark/sql/jdbc/PostgresDialect.scala | 1 - 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index 554abfa72949d..686ce0b0f0537 100644 --- a/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -41,10 +41,12 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { conn.setCatalog("foo") conn.prepareStatement("CREATE TABLE bar (c0 text, c1 integer, c2 double precision, c3 bigint, " + "c4 bit(1), c5 bit(10), c6 bytea, c7 boolean, c8 inet, c9 cidr, " - + "c10 integer[], c11 text[], c12 real[], c13 decimal(38,18)[])").executeUpdate() + + "c10 integer[], c11 text[], c12 real[], c13 decimal(38,18)[], c14 numeric(14, 7)[])") + .executeUpdate() conn.prepareStatement("INSERT INTO bar VALUES ('hello', 42, 1.25, 123456789012345, B'0', " + "B'1000100101', E'\\\\xDEADBEEF', true, '172.16.0.42', '192.168.0.0/16', " - + """'{1, 2}', '{"a", null, "b"}', '{0.11, 0.22}', '{1.5, 3.25}')""").executeUpdate() + + """'{1, 2}', '{"a", null, "b"}', '{0.11, 0.22}', '{1.5, 3.25}', '{3.8, 4.9}')""") + .executeUpdate() } test("Type mapping for various types") { @@ -52,7 +54,7 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { val rows = df.collect() assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass) - assert(types.length == 14) + assert(types.length == 15) assert(classOf[String].isAssignableFrom(types(0))) assert(classOf[java.lang.Integer].isAssignableFrom(types(1))) assert(classOf[java.lang.Double].isAssignableFrom(types(2))) @@ -67,26 +69,25 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { assert(classOf[Seq[String]].isAssignableFrom(types(11))) assert(classOf[Seq[Double]].isAssignableFrom(types(12))) assert(classOf[Seq[java.math.BigDecimal]].isAssignableFrom(types(13))) + assert(classOf[Seq[java.math.BigDecimal]].isAssignableFrom(types(14))) assert(rows(0).getString(0).equals("hello")) assert(rows(0).getInt(1) == 42) assert(rows(0).getDouble(2) == 1.25) assert(rows(0).getLong(3) == 123456789012345L) - assert(rows(0).getBoolean(4) == false) + assert(rows(0).getBoolean(4)) // BIT(10)'s come back as ASCII strings of ten ASCII 0's and 1's... assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](5), Array[Byte](49, 48, 48, 48, 49, 48, 48, 49, 48, 49))) assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](6), Array[Byte](0xDE.toByte, 0xAD.toByte, 0xBE.toByte, 0xEF.toByte))) - assert(rows(0).getBoolean(7) == true) + assert(rows(0).getBoolean(7)) assert(rows(0).getString(8) == "172.16.0.42") assert(rows(0).getString(9) == "192.168.0.0/16") assert(rows(0).getSeq(10) == Seq(1, 2)) assert(rows(0).getSeq(11) == Seq("a", null, "b")) assert(rows(0).getSeq(12).toSeq == Seq(0.11f, 0.22f)) - val c13Expected = Seq(1.5, 3.25).map(new java.math.BigDecimal(_)) - assert(rows(0).getSeq[java.math.BigDecimal](13).zipWithIndex.forall { case (v, idx) => - v.compareTo(c13Expected(idx)) == 0 - }) + assert(rows(0).getSeq(13) == Seq("1.5", "3.25").map(BigDecimal(_).bigDecimal)) + assert(rows(0).getSeq(14) == Seq("3.8", "4.9").map(BigDecimal(_).bigDecimal)) } test("Basic write test") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 6bda91dcdade3..8e076a6fbdd77 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -52,7 +52,6 @@ private object PostgresDialect extends JdbcDialect { case "bytea" => Some(BinaryType) case "timestamp" | "timestamptz" | "time" | "timetz" => Some(TimestampType) case "date" => Some(DateType) - case "numeric" => Some(DecimalType.SYSTEM_DEFAULT) case _ => None }