/
SaveOptionsIT.scala
144 lines (121 loc) · 4.96 KB
/
SaveOptionsIT.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package com.exasol.spark
import java.sql.Date
import com.exasol.spark.util.Types
/**
* Integration tests for saving Spark DataFrames into Exasol tables.
*/
class SaveOptionsIT extends BaseTableQueryIT {
private[this] val saveModes = Seq("append", "errorifexists", "ignore", "overwrite")
private[this] var defaultOptions: Map[String, String] = _
override def beforeEach(): Unit = {
super.beforeEach()
defaultOptions = Map(
"host" -> jdbcHost,
"port" -> jdbcPort,
"table" -> tableName
)
}
// scalastyle:off nonascii
private[this] val dataframeTestData: Seq[(String, String, Date, String)] = Seq(
("name1", "city1", Date.valueOf("2019-01-11"), "äpişge"),
("name1", "city2", Date.valueOf("2019-01-12"), "gül"),
("name2", "city1", Date.valueOf("2019-02-25"), "çigit"),
("name2", "city2", Date.valueOf("2019-02-25"), "okay")
)
// scalastyle:on nonascii
test("`tableExists` should return correct boolean result") {
assert(exasolConnectionManager.tableExists(tableName) === true)
assert(exasolConnectionManager.tableExists("DUMMY_SCHEMA.DUMMYTABLE") === false)
}
test("`truncateTable` should perform table truncation") {
assert(exasolConnectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName") > 0)
exasolConnectionManager.truncateTable(tableName)
assert(exasolConnectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName") === 0)
// Ensure it is idempotent
exasolConnectionManager.truncateTable(tableName)
assert(exasolConnectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName") === 0)
}
test("`dropTable` should drop table") {
assert(exasolConnectionManager.tableExists(tableName) === true)
exasolConnectionManager.dropTable(tableName)
assert(exasolConnectionManager.tableExists(tableName) === false)
// Ensure it is idempotent
exasolConnectionManager.dropTable(tableName)
assert(exasolConnectionManager.tableExists(tableName) === false)
}
test("`createTable` should create a table") {
val newTableName = s"$schema.new_table"
assert(exasolConnectionManager.tableExists(newTableName) === false)
import sqlContext.implicits._
val df = sc
.parallelize(Seq(("a", 103, Date.valueOf("2019-01-14"))))
.toDF("str_col", "int_col", "date_col")
val newTableSchema = Types.createTableSchema(df.schema)
exasolConnectionManager.createTable(newTableName, newTableSchema)
assert(exasolConnectionManager.tableExists(newTableName) === true)
}
test("save mode 'ignore' does not insert data if table exists") {
val initialRecordsCount =
exasolConnectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName")
assert(runDataFrameSave("ignore", 1) === initialRecordsCount)
}
test("save mode 'overwrite' overwrite if table exists") {
assert(runDataFrameSave("overwrite", 2) === dataframeTestData.size.toLong)
}
test("save mode 'append' appends data if table exists") {
val initialRecordsCount =
exasolConnectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName")
val totalRecords = initialRecordsCount + dataframeTestData.size
assert(runDataFrameSave("append", 3) === totalRecords)
}
test("save mode 'errorifexists' throws exception if table exists") {
val thrown = intercept[UnsupportedOperationException] {
runDataFrameSave("errorifexists", 4)
}
assert(thrown.getMessage.contains(s"Table $tableName already exists"))
}
test("save throws without 'create_table' or 'drop_table' option when table does not exist") {
exasolConnectionManager.dropTable(tableName)
saveModes.foreach {
case mode =>
val thrown = intercept[UnsupportedOperationException] {
runDataFrameSave(mode, 2)
}
assert(
thrown.getMessage.contains(s"Table $tableName does not exist. Please enable")
)
}
}
test("save with 'create_table' option creates a new table before saving dataframe") {
val newOptions = defaultOptions ++ Map("create_table" -> "true")
saveModes.foreach {
case mode =>
exasolConnectionManager.dropTable(tableName)
assert(runDataFrameSave(mode, 2, newOptions) === dataframeTestData.size.toLong)
}
}
test("save with 'drop_table' option drops and creates a new table before saving dataframe") {
val newOptions = defaultOptions ++ Map("drop_table" -> "true")
saveModes.foreach {
case mode =>
createTable()
assert(runDataFrameSave(mode, 3, newOptions) === dataframeTestData.size)
}
}
private[this] def runDataFrameSave(
mode: String,
partitionCount: Int,
options: Map[String, String] = defaultOptions
): Long = {
import sqlContext.implicits._
val df = sc
.parallelize(dataframeTestData, partitionCount)
.toDF("name", "city", "date_info", "unicode_col")
df.write
.mode(mode)
.options(options)
.format("exasol")
.save()
exasolConnectionManager.withCountQuery(s"SELECT COUNT(*) FROM $tableName")
}
}