-
Notifications
You must be signed in to change notification settings - Fork 19
/
IndexedTable.scala
264 lines (225 loc) · 7.98 KB
/
IndexedTable.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
/*
* Copyright 2021 Qbeast Analytics, S.L.
*/
package io.qbeast.spark.table
import io.qbeast.core.keeper.Keeper
import io.qbeast.core.model._
import io.qbeast.spark.delta.CubeDataLoader
import io.qbeast.spark.index.QbeastColumns
import io.qbeast.spark.internal.QbeastOptions
import io.qbeast.spark.internal.sources.QbeastBaseRelation
import org.apache.spark.sql.delta.actions.FileAction
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisExceptionFactory, DataFrame}
/**
* Indexed table represents the tabular data storage
* indexed with the OTree indexing technology.
*/
trait IndexedTable {
/**
* Returns whether the table physically exists.
* @return the table physically exists.
*/
def exists: Boolean
/**
* Returns the table id which identifies the table.
*
* @return the table id
*/
def tableID: QTableID
/**
* Saves given data in the table and updates the index. The specified columns are
* used to define the index when the table is created or overwritten. The append
* flag defines whether the existing data should be overwritten.
*
* @param data the data to save
* @param parameters the parameters to save the data
* @param append the data should be appended to the table
* @return the base relation to read the saved data
*/
def save(data: DataFrame, parameters: Map[String, String], append: Boolean): BaseRelation
/**
* Loads the table data.
*
* @return the base relation to read the table data
*/
def load(): BaseRelation
/**
* Analyzes the index for a given revision
* @param revisionID the identifier of revision to analyze
* @return the cubes to analyze
*/
def analyze(revisionID: RevisionID): Seq[String]
/**
* Optimizes the given table for a given revision
* @param revisionID the identifier of revision to optimize
*/
def optimize(revisionID: RevisionID): Unit
}
/**
* IndexedTable factory.
*/
trait IndexedTableFactory {
/**
* Returns a IndexedTable for given SQLContext and path.
* It is not guaranteed that the returned table physically
* exists, use IndexedTable#exists attribute to verify it.
*
* @param tableId the table path
* @return the table
*/
def getIndexedTable(tableId: QTableID): IndexedTable
}
/**
* Implementation of IndexedTableFactory.
* @param keeper the keeper
* @param indexManager the index manager
* @param metadataManager the metadata manager
* @param dataWriter the data writer
* @param revisionBuilder the revision builder
*/
final class IndexedTableFactoryImpl(
private val keeper: Keeper,
private val indexManager: IndexManager[DataFrame],
private val metadataManager: MetadataManager[StructType, FileAction],
private val dataWriter: DataWriter[DataFrame, StructType, FileAction],
private val revisionBuilder: RevisionFactory[StructType])
extends IndexedTableFactory {
override def getIndexedTable(tableID: QTableID): IndexedTable =
new IndexedTableImpl(
tableID,
keeper,
indexManager,
metadataManager,
dataWriter,
revisionBuilder)
}
/**
* Implementation of IndexedTable.
*
* @param tableID the table identifier
* @param keeper the keeper
* @param indexManager the index manager
* @param metadataManager the metadata manager
* @param dataWriter the data writer
* @param revisionBuilder the revision builder
*/
private[table] class IndexedTableImpl(
val tableID: QTableID,
private val keeper: Keeper,
private val indexManager: IndexManager[DataFrame],
private val metadataManager: MetadataManager[StructType, FileAction],
private val dataWriter: DataWriter[DataFrame, StructType, FileAction],
private val revisionBuilder: RevisionFactory[StructType])
extends IndexedTable {
private var snapshotCache: Option[QbeastSnapshot] = None
override def exists: Boolean = !snapshot.isInitial
private def checkRevisionParameters(
qbeastOptions: QbeastOptions,
latestRevision: Revision): Boolean = {
// TODO feature: columnsToIndex may change between revisions
latestRevision.desiredCubeSize == qbeastOptions.cubeSize
}
override def save(
data: DataFrame,
parameters: Map[String, String],
append: Boolean): BaseRelation = {
val indexStatus =
if (exists) {
val latestIndexStatus = snapshot.loadLatestIndexStatus
if (checkRevisionParameters(QbeastOptions(parameters), latestIndexStatus.revision)) {
latestIndexStatus
} else {
val oldRevisionID = latestIndexStatus.revision.revisionID
val newRevision = revisionBuilder
.createNextRevision(tableID, data.schema, parameters, oldRevisionID)
IndexStatus(newRevision)
}
} else {
IndexStatus(revisionBuilder.createNewRevision(tableID, data.schema, parameters))
}
if (exists && append) {
checkColumnsToMatchSchema(indexStatus)
}
val relation = write(data, indexStatus, append)
relation
}
override def load(): BaseRelation = {
clearCaches()
createQbeastBaseRelation()
}
private def snapshot = {
if (snapshotCache.isEmpty) {
snapshotCache = Some(metadataManager.loadSnapshot(tableID))
}
snapshotCache.get
}
private def clearCaches(): Unit = {
snapshotCache = None
}
private def checkColumnsToMatchSchema(indexStatus: IndexStatus): Unit = {
val columnsToIndex = indexStatus.revision.columnTransformers.map(_.columnName)
if (!snapshot.loadLatestRevision.matchColumns(columnsToIndex)) {
throw AnalysisExceptionFactory.create(
s"Columns to index '$columnsToIndex' do not match existing index.")
}
}
private def createQbeastBaseRelation(): QbeastBaseRelation = {
QbeastBaseRelation.forDeltaTable(tableID)
}
private def write(data: DataFrame, indexStatus: IndexStatus, append: Boolean): BaseRelation = {
val revision = indexStatus.revision
if (exists) {
keeper.withWrite(indexStatus.revision.tableID, revision.revisionID) { write =>
val announcedSet = write.announcedCubes
val updatedStatus = indexStatus.addAnnouncements(announcedSet)
doWrite(data, updatedStatus, append)
}
} else {
doWrite(data, indexStatus, append)
}
clearCaches()
createQbeastBaseRelation()
}
private def doWrite(data: DataFrame, indexStatus: IndexStatus, append: Boolean): Unit = {
val schema = data.schema
metadataManager.updateWithTransaction(tableID, schema, append) {
val (qbeastData, tableChanges) =
indexManager.index(data, indexStatus)
val fileActions = dataWriter.write(tableID, schema, qbeastData, tableChanges)
(tableChanges, fileActions)
}
}
override def analyze(revisionID: RevisionID): Seq[String] = {
val indexStatus = snapshot.loadIndexStatus(revisionID)
val cubesToAnnounce = indexManager.analyze(indexStatus)
keeper.announce(tableID, revisionID, cubesToAnnounce)
cubesToAnnounce.map(_.string)
}
override def optimize(revisionID: RevisionID): Unit = {
// begin keeper transaction
val bo = keeper.beginOptimization(tableID, revisionID)
val currentIndexStatus = snapshot.loadIndexStatus(revisionID)
val indexStatus = currentIndexStatus.addAnnouncements(bo.cubesToOptimize)
val cubesToReplicate = indexStatus.cubesToOptimize
val schema = metadataManager.loadCurrentSchema(tableID)
if (cubesToReplicate.nonEmpty) {
metadataManager.updateWithTransaction(tableID, schema, append = true) {
val dataToReplicate =
CubeDataLoader(tableID).loadSetWithCubeColumn(
cubesToReplicate,
indexStatus.revision,
QbeastColumns.cubeToReplicateColumnName)
val (qbeastData, tableChanges) =
indexManager.optimize(dataToReplicate, indexStatus)
val fileActions =
dataWriter.write(tableID, schema, qbeastData, tableChanges)
(tableChanges, fileActions)
}
}
bo.end(cubesToReplicate)
// end keeper transaction
clearCaches()
}
}