-
Notifications
You must be signed in to change notification settings - Fork 7
/
ExasolRDD.scala
162 lines (133 loc) · 4.63 KB
/
ExasolRDD.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package com.exasol.spark.rdd
import java.sql.ResultSet
import java.sql.Statement
import scala.util.control.NonFatal
import org.apache.spark.Partition
import org.apache.spark.SparkContext
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SparkListener
import org.apache.spark.scheduler.SparkListenerApplicationEnd
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.types.StructType
import com.exasol.jdbc.EXAConnection
import com.exasol.jdbc.EXAResultSet
import com.exasol.spark.util.ExasolConnectionManager
/**
* An [[org.apache.spark.rdd.RDD]] reads / writes data from an Exasol tables.
*
* The [[com.exasol.spark.rdd.ExasolRDD]] holds data in parallel from each
* Exasol physical nodes.
*/
class ExasolRDD(
@transient val sc: SparkContext,
queryString: String,
querySchema: StructType,
manager: ExasolConnectionManager
) extends RDD[Row](sc, Nil)
with Logging {
// scalastyle:off null
@transient private var mainConnection: EXAConnection = null
@transient private var mainStatement: Statement = null
@transient private var mainResultSet: EXAResultSet = null
// scalastyle:on
def closeMainResources(): Unit = {
if (mainConnection != null) {
mainConnection.close()
}
if (mainStatement != null) {
mainStatement.close()
}
if (mainResultSet != null) {
mainResultSet.close()
}
}
def createMainConnection(): EXAConnection = {
val conn = manager.mainConnection()
if (conn == null) {
logError("Main EXAConnection is null!")
throw new RuntimeException("Could not establish main connection to Exasol!")
}
val cnt = manager.initParallel(conn)
logInfo(s"Initiated $cnt parallel exasol (sub) connections")
// Close Exasol main connection when SparkContext finishes. This is a
// lifetime of a Spark application.
sc.addSparkListener(new SparkListener {
override def onApplicationEnd(appEnd: SparkListenerApplicationEnd): Unit =
closeMainResources()
})
conn
}
@SuppressWarnings(Array("org.wartremover.warts.AsInstanceOf"))
override def getPartitions: Array[Partition] = {
mainConnection = createMainConnection()
mainStatement = mainConnection.createStatement()
mainResultSet = mainStatement.executeQuery(queryString).asInstanceOf[EXAResultSet]
val handle = mainResultSet.GetHandle()
val partitions = manager
.subConnections(mainConnection)
.zipWithIndex
.map { case (url, idx) => ExasolRDDPartition(idx, handle, url) }
logInfo(s"The number of partitions is ${partitions.size}")
partitions.toArray
}
// scalastyle:off null return
@SuppressWarnings(Array("org.wartremover.warts.AsInstanceOf", "org.wartremover.warts.Return"))
override def compute(split: Partition, context: TaskContext): Iterator[Row] = {
var closed = false
var resultSet: ResultSet = null
val stmt: Statement = null
var conn: EXAConnection = null
def close(): Unit = {
if (closed) {
return
}
try {
if (resultSet != null) { // && !resultSet.isClosed) {
resultSet.close()
}
} catch {
case e: Exception => logWarning("Received an exception closing sub resultSet", e)
}
try {
if (stmt != null && !stmt.isClosed) {
stmt.close()
}
} catch {
case e: Exception => logWarning("Received an exception closing sub statement", e)
}
try {
if (conn != null) {
if (!conn.isClosed && !conn.getAutoCommit) {
try {
conn.commit()
} catch {
case NonFatal(e) => logWarning("Received exception committing sub connection", e)
}
}
conn.close()
logInfo("Closed a sub connection")
}
} catch {
case e: Exception => logWarning("Received an exception closing sub connection", e)
}
closed = true
}
val _ = context.addTaskCompletionListener[Unit] { _ =>
close()
}
val partition: ExasolRDDPartition = split.asInstanceOf[ExasolRDDPartition]
val subHandle: Int = partition.handle
logInfo(s"Sub connection with url = ${partition.connectionUrl} and handle = $subHandle")
if (subHandle == -1) {
logInfo("Sub connection handle is -1, no results, return empty iterator")
return Iterator.empty
}
conn = manager.subConnection(partition.connectionUrl)
resultSet = conn.DescribeResult(subHandle)
JdbcUtils.resultSetToRows(resultSet, querySchema)
}
// scalastyle:on null return
}