-
Notifications
You must be signed in to change notification settings - Fork 703
/
CarbonDropDataMapCommand.scala
253 lines (240 loc) · 10.7 KB
/
CarbonDropDataMapCommand.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.datamap
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.execution.command.AtomicRunnableCommand
import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager}
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapProvider}
import org.apache.carbondata.events._
/**
* Drops the datamap and any related tables associated with the datamap
* @param dataMapName
* @param ifExistsSet
* @param table
*/
case class CarbonDropDataMapCommand(
dataMapName: String,
ifExistsSet: Boolean,
table: Option[TableIdentifier],
forceDrop: Boolean = false)
extends AtomicRunnableCommand {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
private var dataMapProvider: DataMapProvider = _
var mainTable: CarbonTable = _
var dataMapSchema: DataMapSchema = _
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
setAuditInfo(Map("dmName" -> dataMapName))
if (table.isDefined) {
val databaseNameOp = table.get.database
val tableName = table.get.table
val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
val locksToBeAcquired = List(LockUsage.METADATA_LOCK)
val carbonEnv = CarbonEnv.getInstance(sparkSession)
val catalog = carbonEnv.carbonMetaStore
val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName)(sparkSession)
if (mainTable == null) {
mainTable = try {
CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
} catch {
case ex: NoSuchTableException =>
throwMetadataException(dbName, tableName,
s"Dropping datamap $dataMapName failed: ${ ex.getMessage }")
null
}
}
setAuditTable(mainTable)
val tableIdentifier =
AbsoluteTableIdentifier
.from(tablePath,
dbName.toLowerCase,
tableName.toLowerCase,
mainTable.getCarbonTableIdentifier.getTableId)
// forceDrop will be true only when parent table schema updation has failed.
// This method will forcefully drop child table instance from metastore.
if (forceDrop) {
val childTableName = tableName + "_" + dataMapName
LOGGER.info(s"Trying to force drop $childTableName from metastore")
val childCarbonTable: Option[CarbonTable] = try {
Some(CarbonEnv.getCarbonTable(databaseNameOp, childTableName)(sparkSession))
} catch {
case _: Exception =>
LOGGER.warn(s"Child table $childTableName not found in metastore")
None
}
if (childCarbonTable.isDefined) {
val commandToRun = CarbonDropTableCommand(
ifExistsSet = true,
Some(childCarbonTable.get.getDatabaseName),
childCarbonTable.get.getTableName,
dropChildTable = true)
commandToRun.run(sparkSession)
}
dropDataMapFromSystemFolder(sparkSession)
return Seq.empty
}
val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
try {
locksToBeAcquired foreach {
lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, lock)
}
// drop index,mv datamap on the main table.
if (mainTable != null &&
DataMapStoreManager.getInstance().getAllDataMap(mainTable).size() > 0) {
val isDMSchemaExists = DataMapStoreManager.getInstance().getAllDataMap(mainTable).asScala.
exists(_.getDataMapSchema.getDataMapName.equalsIgnoreCase(dataMapName))
if (isDMSchemaExists) {
dropDataMapFromSystemFolder(sparkSession)
return Seq.empty
}
} else if (mainTable != null) {
// If table is defined and datamap is MV datamap, then drop the datamap
val dmSchema = DataMapStoreManager.getInstance().getAllDataMapSchemas.asScala
.filter(dataMapSchema => dataMapSchema.getDataMapName.equalsIgnoreCase(dataMapName))
if (dmSchema.nonEmpty && (!dmSchema.head.isIndexDataMap &&
null != dmSchema.head.getRelationIdentifier)) {
dropDataMapFromSystemFolder(sparkSession)
return Seq.empty
}
}
// drop preaggregate datamap.
// If datamap to be dropped in parent table then drop the datamap from metastore and remove
// entry from parent table.
// If force drop is true then remove the datamap from hivemetastore. No need to remove from
// parent as the first condition would have taken care of it.
if (mainTable != null && mainTable.getTableInfo.getDataMapSchemaList.size() > 0) {
val dataMapSchemaOp = mainTable.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex.
find(_._1.getDataMapName.equalsIgnoreCase(dataMapName))
if (dataMapSchemaOp.isDefined) {
dataMapSchema = dataMapSchemaOp.get._1
val operationContext = new OperationContext
val dropDataMapPreEvent =
DropDataMapPreEvent(
Some(dataMapSchema),
ifExistsSet,
sparkSession)
OperationListenerBus.getInstance.fireEvent(dropDataMapPreEvent, operationContext)
mainTable.getTableInfo.getDataMapSchemaList.remove(dataMapSchemaOp.get._2)
val schemaConverter = new ThriftWrapperSchemaConverterImpl
PreAggregateUtil.updateSchemaInfo(
mainTable,
schemaConverter.fromWrapperToExternalTableInfo(
mainTable.getTableInfo,
dbName,
tableName))(sparkSession)
if (dataMapProvider == null) {
dataMapProvider =
DataMapManager.get.getDataMapProvider(mainTable, dataMapSchema, sparkSession)
}
dataMapProvider.cleanMeta()
// fires the event after dropping datamap from main table schema
val dropDataMapPostEvent =
DropDataMapPostEvent(
Some(dataMapSchema),
ifExistsSet,
sparkSession)
OperationListenerBus.getInstance.fireEvent(dropDataMapPostEvent, operationContext)
} else if (!ifExistsSet) {
throw new NoSuchDataMapException(dataMapName, tableName)
}
} else if (!ifExistsSet) {
throw new NoSuchDataMapException(dataMapName)
}
} catch {
case e: NoSuchDataMapException =>
throw e
case ex: Exception =>
LOGGER.error(s"Dropping datamap $dataMapName failed", ex)
throwMetadataException(dbName, tableName,
s"Dropping datamap $dataMapName failed: ${ ex.getMessage }")
}
finally {
if (carbonLocks.nonEmpty) {
val unlocked = carbonLocks.forall(_.unlock())
if (unlocked) {
LOGGER.info("Table MetaData Unlocked Successfully")
}
}
}
} else {
try {
dropDataMapFromSystemFolder(sparkSession)
} catch {
case e: Exception =>
if (!ifExistsSet) {
throw e
}
}
}
Seq.empty
}
private def dropDataMapFromSystemFolder(sparkSession: SparkSession): Unit = {
try {
if (dataMapSchema == null) {
dataMapSchema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName)
}
if (dataMapSchema != null) {
dataMapProvider =
DataMapManager.get.getDataMapProvider(mainTable, dataMapSchema, sparkSession)
val operationContext: OperationContext = new OperationContext()
val systemFolderLocation: String = CarbonProperties.getInstance().getSystemFolderLocation
val updateDataMapPreExecutionEvent: UpdateDataMapPreExecutionEvent =
UpdateDataMapPreExecutionEvent(sparkSession, systemFolderLocation, null)
OperationListenerBus.getInstance().fireEvent(updateDataMapPreExecutionEvent,
operationContext)
DataMapStatusManager.dropDataMap(dataMapSchema.getDataMapName)
val updateDataMapPostExecutionEvent: UpdateDataMapPostExecutionEvent =
UpdateDataMapPostExecutionEvent(sparkSession, systemFolderLocation, null)
OperationListenerBus.getInstance().fireEvent(updateDataMapPostExecutionEvent,
operationContext)
// if it is indexDataMap provider like lucene, then call cleanData, which will launch a job
// to clear datamap from memory(clears from segmentMap and cache), This is called before
// deleting the datamap schemas from _System folder
if (dataMapProvider.isInstanceOf[IndexDataMapProvider]) {
dataMapProvider.cleanData()
}
dataMapProvider.cleanMeta()
}
} catch {
case e: Exception =>
if (!ifExistsSet) {
throw e
}
}
}
override def processData(sparkSession: SparkSession): Seq[Row] = {
// delete the table folder
if (dataMapProvider != null) {
dataMapProvider.cleanData()
}
Seq.empty
}
override protected def opName: String = "DROP DATAMAP"
}