Skip to content

Commit

Permalink
issue #607 Format SQL statements. Fix upper/lower case.
Browse files Browse the repository at this point in the history
  • Loading branch information
wajda committed Mar 31, 2023
1 parent b9005ab commit a81b239
Show file tree
Hide file tree
Showing 18 changed files with 78 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,22 @@ object DeltaDSV2Job extends SparkApp(
spark.sql(s"CREATE DATABASE dsv2 LOCATION '$path'")

//AppendData
spark.sql("CREATE TABLE dsv2.ad ( foo String ) USING DELTA")
spark.sql("CREATE TABLE dsv2.ad (foo STRING) USING DELTA")
spark.sql("INSERT INTO dsv2.ad VALUES ('Mouse')")

//OverwriteByExpression with condition == true
spark.sql("CREATE TABLE dsv2.owbe ( foo String ) USING DELTA")
spark.sql("INSERT OVERWRITE dsv2.owbe VALUES ('Dog')")
spark.sql("CREATE TABLE dsv2.owbe (foo STRING) USING DELTA")
spark.sql("INSERT OVERWRITE dsv2.owbe VALUES ('Dog')")

//OverwriteByExpression with advanced condition
spark.sql(s"CREATE TABLE dsv2.owbep (ID int, NAME string) USING delta PARTITIONED BY (ID)")
spark.sql(s"CREATE TABLE dsv2.owbep (id INT, name STRING) USING delta PARTITIONED BY (id)")
spark.sql("INSERT OVERWRITE dsv2.owbep PARTITION (ID = 222222) VALUES ('Cat')")

//CreateTableAsSelect
spark.sql("CREATE TABLE dsv2.ctas USING DELTA AS SELECT * FROM dsv2.ad;")

//ReplaceTableAsSelect
spark.sql(s"CREATE TABLE dsv2.rtas (toBeOrNotToBe boolean) USING delta")
spark.sql(s"CREATE TABLE dsv2.rtas (toBeOrNotToBe boolean) USING DELTA")
val data = spark.sql(s"SELECT * FROM dsv2.ad")
data.write.format("delta")
.mode("overwrite")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ object DeltaMergeDSV2Job extends SparkApp(

spark.sql(s"CREATE DATABASE dsv2 LOCATION '$path'")

spark.sql("CREATE TABLE dsv2.foo ( id INT, code STRING, name STRING ) USING DELTA")
spark.sql("CREATE TABLE dsv2.foo (id INT, code STRING, name STRING) USING DELTA")
spark.sql("INSERT INTO dsv2.foo VALUES (1014, 'PLN', 'Warsaw'), (1002, 'FRA', 'Corte')")

spark.sql("CREATE TABLE dsv2.fooUpdate ( id INT, name STRING ) USING DELTA")
spark.sql("CREATE TABLE dsv2.fooUpdate (id INT, name STRING) USING DELTA")
spark.sql("INSERT INTO dsv2.fooUpdate VALUES (1014, 'Lodz'), (1003, 'Prague')")

spark.sql("CREATE TABLE dsv2.barUpdate ( id INT, name STRING ) USING DELTA")
spark.sql("CREATE TABLE dsv2.barUpdate (id INT, name STRING) USING DELTA")
spark.sql("INSERT INTO dsv2.barUpdate VALUES (4242, 'Paris'), (3342, 'Bordeaux')")

spark.sql("UPDATE dsv2.foo SET name = 'Korok' WHERE id == 1002")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class AttributeReorderingFilterSpec extends AsyncFlatSpec

for {
(plan, _) <- lineageCaptor.lineageOf {
spark.sql(s"CREATE TABLE t1 (i int, j int) USING delta")
spark.sql(s"CREATE TABLE t1 (i int, j int) USING DELTA")

Seq((3, 4)).toDF("j", "i")
.write.format("delta")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class BasicIntegrationTests extends AsyncFlatSpec
import spark.implicits._

withNewSparkSession {
_.sql("drop table if exists someTable")
_.sql("DROP TABLE IF EXISTS someTable")
}

for {
Expand Down Expand Up @@ -81,7 +81,7 @@ class BasicIntegrationTests extends AsyncFlatSpec
val path = TempDirectory("spline", ".table").deleteOnExit().path.toUri

withNewSparkSession {
_.sql(s"create table $tableName (num int) using parquet location '$path' ")
_.sql(s"CREATE TABLE $tableName (num INT) USING PARQUET LOCATION '$path' ")
}

val schema: StructType = StructType(List(StructField("num", IntegerType, nullable = true)))
Expand All @@ -103,8 +103,8 @@ class BasicIntegrationTests extends AsyncFlatSpec
val path = TempDirectory("spline", ".table").deleteOnExit().path.toUri

withNewSparkSession { innerSpark =>
innerSpark.sql(s"drop table if exists $tableName")
innerSpark.sql(s"create table $tableName (num int) using parquet location '$path' ")
innerSpark.sql(s"DROP TABLE IF EXISTS $tableName")
innerSpark.sql(s"CREATE TABLE $tableName (num INT) USING PARQUET LOCATION '$path' ")
}

val schema: StructType = StructType(List(StructField("num", IntegerType, nullable = true)))
Expand Down Expand Up @@ -137,7 +137,7 @@ class BasicIntegrationTests extends AsyncFlatSpec
val path = TempDirectory("spline", ".table", pathOnly = true).deleteOnExit().path

withNewSparkSession {
_.sql(s"create table e_table(num int) using parquet location '${path.toUri}'")
_.sql(s"CREATE TABLE e_table(num INT) USING PARQUET LOCATION '${path.toUri}'")
}

for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class CassandraSpec

Using.resource(container.getCluster.connect()) { session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS $keyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};")
session.execute(s"CREATE TABLE IF NOT EXISTS $keyspace.$table (ID INT, NAME TEXT, PRIMARY KEY (ID))")
session.execute(s"CREATE TABLE IF NOT EXISTS $keyspace.$table (id INT, name TEXT, PRIMARY KEY (id))")
}

val testData: DataFrame = {
Expand Down Expand Up @@ -107,7 +107,7 @@ class CassandraSpec

Using.resource(container.getCluster.connect()) { session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS $keyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};")
session.execute(s"CREATE TABLE IF NOT EXISTS $keyspace.$table (ID INT, NAME TEXT, PRIMARY KEY (ID))")
session.execute(s"CREATE TABLE IF NOT EXISTS $keyspace.$table (id INT, name TEXT, PRIMARY KEY (id))")
}

val testData: DataFrame = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ class DeltaDSV2Spec extends AsyncFlatSpec
withDatabase("testDB") {
val testData = {
import spark.implicits._
Seq((1014, "Warsaw"), (1002, "Corte")).toDF("ID", "NAME")
Seq((1014, "Warsaw"), (1002, "Corte")).toDF("id", "name")
}

for {
(plan1, Seq(event1)) <- lineageCaptor.lineageOf {
spark.sql("CREATE TABLE foo (ID int, NAME string) USING delta")
spark.sql("CREATE TABLE foo (id INT, name STRING) USING DELTA")
testData.write.format("delta").mode("append").saveAsTable("foo")
}
} yield {
Expand All @@ -77,12 +77,12 @@ class DeltaDSV2Spec extends AsyncFlatSpec
withDatabase("testDB") {
val testData = {
import spark.implicits._
Seq((1014, "Warsaw"), (1002, "Corte")).toDF("ID", "NAME")
Seq((1014, "Warsaw"), (1002, "Corte")).toDF("id", "name")
}

for {
(plan1, Seq(event1)) <- lineageCaptor.lineageOf {
spark.sql("CREATE TABLE foo (ID int, NAME string) USING delta")
spark.sql("CREATE TABLE foo (id INT, name STRING) USING DELTA")
testData.write.format("delta").mode("overwrite").insertInto("foo")
}
} yield {
Expand Down Expand Up @@ -115,11 +115,11 @@ class DeltaDSV2Spec extends AsyncFlatSpec

for {
(plan1, Seq(event1)) <- lineageCaptor.lineageOf {
spark.sql("CREATE TABLE foo (ID int, NAME string) USING delta PARTITIONED BY (ID)")
spark.sql("CREATE TABLE foo (id INT, name STRING) USING DELTA PARTITIONED BY (id)")
spark.sql(
"""
|INSERT OVERWRITE foo PARTITION (ID = 222222)
| (SELECT NAME FROM tempdata WHERE NAME = 'Warsaw')
|INSERT OVERWRITE foo PARTITION (id = 222222)
| (SELECT name FROM tempdata WHERE name = 'Warsaw')
|""".stripMargin)
}
} yield {
Expand Down Expand Up @@ -151,17 +151,17 @@ class DeltaDSV2Spec extends AsyncFlatSpec
withDatabase("testDB") {
val testData = {
import spark.implicits._
Seq((1014, "Warsaw"), (1002, "Corte")).toDF("ID", "NAME")
Seq((1014, "Warsaw"), (1002, "Corte")).toDF("id", "name")
}
testData.createOrReplaceTempView("tempdata")

for {
(plan1, Seq(event1)) <- lineageCaptor.lineageOf {
spark.sql("CREATE TABLE foo (ID int, NAME string) USING delta PARTITIONED BY (NAME)")
spark.sql("CREATE TABLE foo (id INT, name STRING) USING DELTA PARTITIONED BY (name)")
spark.sql(
"""
|INSERT OVERWRITE foo PARTITION (NAME)
| (SELECT ID, NAME FROM tempdata WHERE NAME = 'Warsaw')
|INSERT OVERWRITE foo PARTITION (name)
| (SELECT id, name FROM tempdata WHERE name = 'Warsaw')
|""".stripMargin)
}
} yield {
Expand All @@ -185,7 +185,7 @@ class DeltaDSV2Spec extends AsyncFlatSpec
withDatabase("testDB") {
val testData = {
import spark.implicits._
Seq((1014, "Warsaw"), (1002, "Corte")).toDF("ID", "NAME")
Seq((1014, "Warsaw"), (1002, "Corte")).toDF("id", "name")
}

for {
Expand Down Expand Up @@ -213,12 +213,12 @@ class DeltaDSV2Spec extends AsyncFlatSpec
withDatabase("testDB") {
val testData = {
import spark.implicits._
Seq((1014, "Warsaw"), (1002, "Corte")).toDF("ID", "NAME")
Seq((1014, "Warsaw"), (1002, "Corte")).toDF("id", "name")
}

for {
(plan1, Seq(event1)) <- lineageCaptor.lineageOf {
spark.sql("CREATE TABLE foo (toBeOrNotToBe boolean) USING delta")
spark.sql("CREATE TABLE foo (toBeOrNotToBe BOOLEAN) USING DELTA")
testData.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("foo")
}
} yield {
Expand All @@ -242,15 +242,15 @@ class DeltaDSV2Spec extends AsyncFlatSpec
withDatabase("testDB") {
val testData = {
import spark.implicits._
Seq((1014, "Warsaw"), (1002, "Corte")).toDF("ID", "NAME")
Seq((1014, "Warsaw"), (1002, "Corte")).toDF("id", "name")
}

for {
(plan1, Seq(event1)) <- lineageCaptor.lineageOf {
testData.write.format("delta").saveAsTable("foo")
}
(plan2, Seq(event2)) <- lineageCaptor.lineageOf {
spark.sql("DELETE FROM foo WHERE ID == 1014")
spark.sql("DELETE FROM foo WHERE id == 1014")
}
} yield {
plan2.id.value shouldEqual event2.planId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class DeltaSpec extends AsyncFlatSpec
withNewSparkSession { implicit spark =>
withLineageTracking { captor =>
val testData: DataFrame = {
val schema = StructType(StructField("ID", IntegerType, nullable = false) :: StructField("NAME", StringType, nullable = false) :: Nil)
val schema = StructType(StructField("id", IntegerType, nullable = false) :: StructField("name", StringType, nullable = false) :: Nil)
val rdd = spark.sparkContext.parallelize(Row(1014, "Warsaw") :: Row(1002, "Corte") :: Nil)
spark.sqlContext.createDataFrame(rdd, schema)
}
Expand Down Expand Up @@ -83,13 +83,13 @@ class DeltaSpec extends AsyncFlatSpec
withNewSparkSession { implicit spark =>
withLineageTracking { lineageCaptor =>
val testData: DataFrame = {
val schema = StructType(StructField("ID", IntegerType, nullable = false) :: StructField("NAME", StringType, nullable = false) :: Nil)
val schema = StructType(StructField("id", IntegerType, nullable = false) :: StructField("name", StringType, nullable = false) :: Nil)
val rdd = spark.sparkContext.parallelize(Row(1014, "Warsaw") :: Row(1002, "Corte") :: Nil)
spark.sqlContext.createDataFrame(rdd, schema)
}

withNewSparkSession {
_.sql(s"CREATE TABLE table_name(ID int, NAME string) USING DELTA LOCATION '$deltaPath'")
_.sql(s"CREATE TABLE table_name(id INT, name STRING) USING DELTA LOCATION '$deltaPath'")
}

for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ExcelSpec extends AsyncFlatSpec
withNewSparkSession { implicit spark =>
withLineageTracking { captor =>
val testData: DataFrame = {
val schema = StructType(StructField("ID", IntegerType, nullable = false) :: StructField("NAME", StringType, nullable = false) :: Nil)
val schema = StructType(StructField("id", IntegerType, nullable = false) :: StructField("name", StringType, nullable = false) :: Nil)
val rdd = spark.sparkContext.parallelize(Row(1014, "Warsaw") :: Row(1002, "Corte") :: Nil)
spark.sqlContext.createDataFrame(rdd, schema)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class FileFormatSpec extends AsyncFlatSpec
withNewSparkSession { implicit spark =>
withLineageTracking { captor =>
val testData: DataFrame = {
val schema = StructType(StructField("ID", IntegerType, nullable = false) :: StructField("NAME", StringType, nullable = false) :: Nil)
val schema = StructType(StructField("id", IntegerType, nullable = false) :: StructField("name", StringType, nullable = false) :: Nil)
val rdd = spark.sparkContext.parallelize(Row(1014, "Warsaw") :: Row(1002, "Corte") :: Nil)
spark.sqlContext.createDataFrame(rdd, schema)
}
Expand Down Expand Up @@ -74,7 +74,7 @@ class FileFormatSpec extends AsyncFlatSpec
withNewSparkSession { implicit spark =>
withLineageTracking { captor =>
val testData: DataFrame = {
val schema = StructType(StructField("ID", IntegerType, nullable = false) :: StructField("NAME", StringType, nullable = false) :: Nil)
val schema = StructType(StructField("id", IntegerType, nullable = false) :: StructField("name", StringType, nullable = false) :: Nil)
val rdd = spark.sparkContext.parallelize(Row(1014, "Warsaw") :: Row(1002, "Corte") :: Nil)
spark.sqlContext.createDataFrame(rdd, schema)
}
Expand Down Expand Up @@ -106,7 +106,7 @@ class FileFormatSpec extends AsyncFlatSpec
withNewSparkSession { implicit spark =>
withLineageTracking { captor =>
val testData: DataFrame = {
val schema = StructType(StructField("ID", IntegerType, nullable = false) :: StructField("NAME", StringType, nullable = false) :: Nil)
val schema = StructType(StructField("id", IntegerType, nullable = false) :: StructField("name", StringType, nullable = false) :: Nil)
val rdd = spark.sparkContext.parallelize(Row(1014, "Warsaw") :: Row(1002, "Corte") :: Nil)
spark.sqlContext.createDataFrame(rdd, schema)
}
Expand Down Expand Up @@ -138,7 +138,7 @@ class FileFormatSpec extends AsyncFlatSpec
withNewSparkSession { implicit spark =>
withLineageTracking { captor =>
val testData: DataFrame = {
val schema = StructType(StructField("ID", IntegerType, nullable = false) :: StructField("NAME", StringType, nullable = false) :: Nil)
val schema = StructType(StructField("id", IntegerType, nullable = false) :: StructField("name", StringType, nullable = false) :: Nil)
val rdd = spark.sparkContext.parallelize(Row(1014, "Warsaw") :: Row(1002, "Corte") :: Nil)
spark.sqlContext.createDataFrame(rdd, schema)
}
Expand Down Expand Up @@ -170,7 +170,7 @@ class FileFormatSpec extends AsyncFlatSpec
withNewSparkSession { implicit spark =>
withLineageTracking { captor =>
val testData: DataFrame = {
val schema = StructType(StructField("ID", IntegerType, nullable = false) :: StructField("NAME", StringType, nullable = false) :: Nil)
val schema = StructType(StructField("id", IntegerType, nullable = false) :: StructField("name", StringType, nullable = false) :: Nil)
val rdd = spark.sparkContext.parallelize(Row(1014, "Warsaw") :: Row(1002, "Corte") :: Nil)
spark.sqlContext.createDataFrame(rdd, schema)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class IcebergDSV2Spec extends AsyncFlatSpec
.config("spark.sql.catalog.iceberg_catalog.warehouse", warehouseUri.toString)
) { implicit spark =>
withLineageTracking { lineageCaptor =>
spark.sql("CREATE TABLE iceberg_catalog.foo (id bigint, data string) USING iceberg;")
spark.sql("CREATE TABLE iceberg_catalog.foo (id BIGINT, data STRING) USING ICEBERG;")

for {
(plan1, Seq(event1)) <- lineageCaptor.lineageOf {
Expand All @@ -67,8 +67,8 @@ class IcebergDSV2Spec extends AsyncFlatSpec
.config("spark.sql.catalog.iceberg_catalog.warehouse", warehouseUri.toString)
) { implicit spark =>
withLineageTracking { lineageCaptor =>
spark.sql("CREATE TABLE iceberg_catalog.bar (id bigint, data string) USING iceberg;")
spark.sql("CREATE TABLE iceberg_catalog.baz (id bigint, data string) USING iceberg;")
spark.sql("CREATE TABLE iceberg_catalog.bar (id BIGINT, data STRING) USING ICEBERG;")
spark.sql("CREATE TABLE iceberg_catalog.baz (id BIGINT, data STRING) USING ICEBERG;")

for {
(_, Seq(_)) <- lineageCaptor.lineageOf {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class InsertIntoHiveTest
withLineageTracking { captor =>
val databaseName = s"unitTestDatabase_${this.getClass.getSimpleName}"
withDatabase(databaseName,
("path_archive", "(x String, ymd int) USING hive", Seq(("Tata", 20190401), ("Tere", 20190403))),
("path", "(x String) USING hive", Seq("Monika", "Buba"))
("path_archive", "(x STRING, ymd INT) USING HIVE", Seq(("Tata", 20190401), ("Tere", 20190403))),
("path", "(x String) USING HIVE", Seq("Monika", "Buba"))
) {
val df = spark
.table("path")
Expand Down Expand Up @@ -72,12 +72,12 @@ class InsertIntoHiveTest
withDatabase("test",
(
"path_archive_csvserde",
"(x String, ymd int) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'",
"(x STRING, ymd INT) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'",
Seq(("Tata", 20190401), ("Tere", 20190403))
),
(
"path_csvserde",
"(x String) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'",
"(x STRING) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'",
Seq("Monika", "Buba")
)
) {
Expand Down Expand Up @@ -108,11 +108,11 @@ class InsertIntoHiveTest
withLineageTracking { captor =>
withDatabase("test",
(
"path_archive_parquetserde", "(x String, ymd int) stored as PARQUET",
"path_archive_parquetserde", "(x STRING, ymd INT) STORED AS PARQUET",
Seq(("Tata", 20190401), ("Tere", 20190403))
),
(
"path_parquetserde", "(x String) stored as PARQUET",
"path_parquetserde", "(x STRING) STORED AS PARQUET",
Seq("Monika", "Buba")
)
) {
Expand Down Expand Up @@ -143,11 +143,11 @@ class InsertIntoHiveTest
withLineageTracking { captor =>
withDatabase("test",
(
"path_archive_orcserde", "(x String, ymd int) stored as orc",
"path_archive_orcserde", "(x string, ymd INT) STORED AS ORC",
Seq(("Tata", 20190401), ("Tere", 20190403))
),
(
"path_orcserde", "(x String) stored as orc",
"path_orcserde", "(x STRING) STORED AS ORC",
Seq("Monika", "Buba")
)
) {
Expand Down
Loading

0 comments on commit a81b239

Please sign in to comment.