Skip to content
Permalink
Browse files
[CARBONDATA-4206] Support rename SI table
Why is this PR needed?
Currently rename SI table can succeed, but after rename, insert and query on main table
failed, throw no such table exception. This is because after SI table renamed, main
table's tblproperties didn't get update, it still stores the old SI table name, when
refering to SI table, it tries to find the SI table by old name, which leads to no such table exception.

What changes were proposed in this PR?
After SI table renamed, update the main table's tblproperties with new SI information.

Does this PR introduce any user interface change?
No

Is any new testcase added?
Yes

This closes #4149
  • Loading branch information
jack86596 authored and Indhumathi27 committed Jun 16, 2021
1 parent 90841bc commit f1da9e8c155297ea22808e5b01137cd84e9561d9
Showing 5 changed files with 135 additions and 34 deletions.
@@ -97,6 +97,17 @@ public void removeIndexTableInfo(String indexName) {
}
}

public void renameIndexWithStatus(String indexProvider, String oldIndexName,
String newIndexName, String indexStatus) {
if (null != indexProviderMap) {
Map<String, String> properties = indexProviderMap.get(indexProvider).remove(oldIndexName);
if (properties != null) {
properties.put(CarbonCommonConstants.INDEX_STATUS, indexStatus);
indexProviderMap.get(indexProvider).put(newIndexName, properties);
}
}
}

public void updateIndexStatus(String indexProvider, String indexName, String indexStatus) {
if (null != indexProviderMap) {
indexProviderMap.get(indexProvider).get(indexName)
@@ -162,4 +173,8 @@ public static IndexMetadata deserialize(String serializedIndexMeta) throws IOExc
public String getIndexColumns(String provider, String indexName) {
return indexProviderMap.get(provider).get(indexName).get(CarbonCommonConstants.INDEX_COLUMNS);
}

public String getIndexStatus(String provider, String indexName) {
return indexProviderMap.get(provider).get(indexName).get(CarbonCommonConstants.INDEX_STATUS);
}
}
@@ -22,14 +22,17 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCommand}
import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalogUtil, MockClassForAlterRevertTests}
import org.apache.spark.sql.index.CarbonIndexUtil
import org.apache.spark.util.AlterTableUtil

import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.features.TableOperation
import org.apache.carbondata.core.index.IndexStoreManager
import org.apache.carbondata.core.index.status.IndexStatus
import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.index.IndexType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.events.{AlterTableRenamePostEvent, AlterTableRenamePreEvent, OperationContext, OperationListenerBus}
@@ -69,31 +72,26 @@ private[sql] case class CarbonAlterTableRenameCommand(
throwMetadataException(oldDatabaseName, oldTableName, "Table does not exist")
}

var oldCarbonTable: CarbonTable = null
oldCarbonTable = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
.asInstanceOf[CarbonRelation].carbonTable
if (!oldCarbonTable.getTableInfo.isTransactionalTable) {
var carbonTable: CarbonTable = relation.carbonTable
if (!carbonTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}

if (!oldCarbonTable.canAllow(oldCarbonTable, TableOperation.ALTER_RENAME)) {
if (!carbonTable.canAllow(carbonTable, TableOperation.ALTER_RENAME)) {
throw new MalformedCarbonCommandException("alter rename is not supported for this table")
}
// if table have created MV, not support table rename
if (MVManagerInSpark.get(sparkSession).hasSchemaOnTable(oldCarbonTable) ||
oldCarbonTable.isMV) {
if (MVManagerInSpark.get(sparkSession).hasSchemaOnTable(carbonTable) || carbonTable.isMV) {
throw new MalformedCarbonCommandException(
"alter rename is not supported for MV table or for tables which have child MV")
}

var timeStamp = 0L
var carbonTable: CarbonTable = null
var hiveRenameSuccess = false
// lock file path to release locks after operation
var carbonTableLockFilePath: String = null
var originalIndexStatusBeforeDisable: IndexStatus = null
try {
carbonTable = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
.asInstanceOf[CarbonRelation].carbonTable
carbonTableLockFilePath = carbonTable.getTablePath
// if any load is in progress for table, do not allow rename table
if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
@@ -104,13 +102,19 @@ private[sql] case class CarbonAlterTableRenameCommand(
IndexStoreManager.getInstance().clearIndex(oldAbsoluteTableIdentifier)
// get the latest carbon table and check for column existence
val operationContext = new OperationContext
// TODO: Pass new Table Path in pre-event.
val alterTableRenamePreEvent: AlterTableRenamePreEvent = AlterTableRenamePreEvent(
carbonTable,
alterTableRenameModel,
"",
sparkSession)
OperationListenerBus.getInstance().fireEvent(alterTableRenamePreEvent, operationContext)
if (carbonTable.isIndexTable) {
val oldIndexName = alterTableRenameModel.oldTableIdentifier.table
val parentTableName = carbonTable.getParentTableName
val parentTable: CarbonTable = CarbonEnv.getCarbonTable(
Some(oldDatabaseName), parentTableName)(sparkSession)
originalIndexStatusBeforeDisable = CarbonIndexUtil.updateIndexInfo(
parentTable, oldIndexName, IndexType.SI, IndexStatus.DISABLED)(sparkSession)
} else {
// TODO: Pass new Table Path in pre-event.
val alterTableRenamePreEvent: AlterTableRenamePreEvent = AlterTableRenamePreEvent(
carbonTable, alterTableRenameModel, "", sparkSession)
OperationListenerBus.getInstance().fireEvent(alterTableRenamePreEvent, operationContext)
}
val tableInfo: org.apache.carbondata.format.TableInfo =
metastore.getThriftTableInfo(carbonTable)
val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
@@ -142,12 +146,24 @@ private[sql] case class CarbonAlterTableRenameCommand(
carbonTable.getTablePath)(sparkSession)
new MockClassForAlterRevertTests().mockForAlterRevertTest()

val alterTableRenamePostEvent: AlterTableRenamePostEvent = AlterTableRenamePostEvent(
carbonTable,
alterTableRenameModel,
oldAbsoluteTableIdentifier.getTablePath,
sparkSession)
OperationListenerBus.getInstance().fireEvent(alterTableRenamePostEvent, operationContext)
if (carbonTable.isIndexTable) {
val oldIndexName = alterTableRenameModel.oldTableIdentifier.table
val parentTableName = carbonTable.getParentTableName
val parentTable: CarbonTable = metastore
.lookupRelation(Some(oldDatabaseName), parentTableName)(sparkSession)
.asInstanceOf[CarbonRelation].carbonTable
CarbonIndexUtil.updateIndexInfo(parentTable, oldIndexName, IndexType.SI,
originalIndexStatusBeforeDisable, newTableName)(sparkSession)
metastore.lookupRelation(Some(oldDatabaseName), newTableName)(sparkSession)
.asInstanceOf[CarbonRelation]
} else {
val alterTableRenamePostEvent: AlterTableRenamePostEvent = AlterTableRenamePostEvent(
carbonTable,
alterTableRenameModel,
oldAbsoluteTableIdentifier.getTablePath,
sparkSession)
OperationListenerBus.getInstance().fireEvent(alterTableRenamePostEvent, operationContext)
}

sparkSession.catalog.refreshTable(newTableIdentifier.quotedString)
LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
@@ -163,6 +179,18 @@ private[sql] case class CarbonAlterTableRenameCommand(
sparkSession,
carbonTable.isExternalTable)
}
// it means rename table is index table and disable index has succeed, need revert
if (originalIndexStatusBeforeDisable != null &&
!originalIndexStatusBeforeDisable.equals(IndexStatus.DISABLED)) {
val oldIndexName = alterTableRenameModel.oldTableIdentifier.table
val parentTableName = carbonTable.getParentTableName
val parentTable: CarbonTable = CarbonEnv.getCarbonTable(
Some(oldDatabaseName), parentTableName)(sparkSession)
CarbonIndexUtil.updateIndexInfo(parentTable, oldIndexName, IndexType.SI,
originalIndexStatusBeforeDisable)(sparkSession)
metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
.asInstanceOf[CarbonRelation]
}
if (carbonTable != null) {
AlterTableUtil.revertRenameTableChanges(
newTableName,
@@ -435,12 +435,17 @@ object CarbonIndexUtil {
}
}

def updateIndexStatus(carbonTable: CarbonTable,
/**
*
* @param newIndexName nonEmpty means rename index
* @return original index status, null means update failed
*/
def updateIndexInfo(carbonTable: CarbonTable,
indexName: String,
indexType: IndexType,
status: IndexStatus,
needLock: Boolean = true,
sparkSession: SparkSession): Unit = {
newIndexName: String = "",
needLock: Boolean = true)(sparkSession: SparkSession): IndexStatus = {
val dbName = carbonTable.getDatabaseName
val tableName = carbonTable.getTableName
val locks: java.util.List[ICarbonLock] = new java.util.ArrayList[ICarbonLock]
@@ -458,23 +463,32 @@ object CarbonIndexUtil {
}
CarbonMetadata.getInstance.removeTable(dbName, tableName)
val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
val indexInfo = IndexTableInfo.setIndexStatus(table.getIndexInfo, indexName, status)
val indexMetadata = table.getIndexMetadata
val originalStatus = indexMetadata.getIndexStatus(indexType.getIndexProviderName, indexName)
if (newIndexName.isEmpty) {
indexMetadata.updateIndexStatus(indexType.getIndexProviderName, indexName, status.name())
} else {
indexMetadata.renameIndexWithStatus(indexType.getIndexProviderName,
indexName, newIndexName, status.name())
}
val newIndexInfo = table.getIndexInfo
indexMetadata.updateIndexStatus(indexType.getIndexProviderName, indexName, status.name())
table.getTableInfo
.getFactTable
.getTableProperties
.put(table.getCarbonTableIdentifier.getTableId, indexMetadata.serialize)
sparkSession.sql(
s"""ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES ('indexInfo' =
|'$indexInfo')"""
|'$newIndexInfo')"""
.stripMargin).collect()
CarbonHiveIndexMetadataUtil.refreshTable(dbName, tableName, sparkSession)
CarbonMetadata.getInstance.removeTable(dbName, tableName)
CarbonMetadata.getInstance.loadTableMetadata(table.getTableInfo)
IndexStatus.valueOf(originalStatus)
} catch {
case e: Exception =>
LOGGER.error("Failed to update index status for %s".format(indexName))
null
} finally {
AlterTableUtil.releaseLocks(locks.asScala.toList)
}
@@ -23,7 +23,6 @@ import org.apache.log4j.Logger
import org.apache.spark.internal.Logging
import org.apache.spark.sql.CarbonEnv
import org.apache.spark.sql.hive._
import org.apache.spark.sql.index.CarbonIndexUtil

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.metadata.index.IndexType
@@ -40,11 +39,11 @@ class AlterTableRenameEventListener extends OperationEventListener with Logging
*/
override def onEvent(event: Event, operationContext: OperationContext): Unit = {
event match {
case alterTableRenamePreEvent: AlterTableRenamePostEvent =>
LOGGER.info("alter table rename Pre event listener called")
val alterTableRenameModel = alterTableRenamePreEvent.alterTableRenameModel
val carbonTable = alterTableRenamePreEvent.carbonTable
val sparkSession = alterTableRenamePreEvent.sparkSession
case alterTableRenamePostEvent: AlterTableRenamePostEvent =>
LOGGER.info("alter table rename post event listener called")
val alterTableRenameModel = alterTableRenamePostEvent.alterTableRenameModel
val carbonTable = alterTableRenamePostEvent.carbonTable
val sparkSession = alterTableRenamePostEvent.sparkSession
val oldDatabaseName = carbonTable.getDatabaseName
val newTableName = alterTableRenameModel.newTableIdentifier.table
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
@@ -17,11 +17,15 @@

package org.apache.carbondata.spark.testsuite.createTable

import mockit.{Mock, MockUp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.MockClassForAlterRevertTests
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.exception.ProcessMetaDataException

/**
* test functionality for alter table with indexSchema
@@ -34,6 +38,7 @@ class TestRenameTableWithIndex extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS carbon_table")
sql("DROP TABLE IF EXISTS carbon_tb")
sql("DROP TABLE IF EXISTS fact_table1")
sql("DROP TABLE IF EXISTS x1")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
}
@@ -116,6 +121,45 @@ class TestRenameTableWithIndex extends QueryTest with BeforeAndAfterAll {
true, "dm_carbon_si")
}

test("rename index table success, insert new record success" +
" and query hit new index table") {
sql("create table if not exists x1 (imei string, mac string) stored as carbondata")
sql("create index idx_x1_mac on table x1(mac) as 'carbondata'")
sql("alter table idx_x1_mac rename to idx_x1_mac1")
checkAnswer(sql("show indexes on x1"),
Row("idx_x1_mac1", "carbondata", "mac", "NA", "enabled", "NA"))
checkAnswer(sql("insert into x1 select '1', '2'"), Row("0"))
assert(sql("explain select * from x1 where mac = '2'")
.collect()(1).getString(0).contains("idx_x1_mac1"))
checkAnswer(sql("select count(*) from x1 where mac = '2'"), Row(1))
sql("DROP TABLE IF EXISTS x1")
}

test("rename index table fail, revert success, insert new record success" +
" and query hit old index table") {
val mock: MockUp[MockClassForAlterRevertTests] = new MockUp[MockClassForAlterRevertTests]() {
@Mock
@throws[ProcessMetaDataException]
def mockForAlterRevertTest(): Unit = {
throw new ProcessMetaDataException("default", "idx_x1_mac", "thrown in mock")
}
}
sql("create table if not exists x1 (imei string, mac string) stored as carbondata")
sql("create index idx_x1_mac on table x1(mac) as 'carbondata'")
intercept[ProcessMetaDataException] {
sql("alter table idx_x1_mac rename to idx_x1_mac1")
}
checkAnswer(sql("show indexes on x1"),
Row("idx_x1_mac", "carbondata", "mac", "NA", "enabled", "NA"))
checkAnswer(sql("insert into x1 select '1', '2'"), Row("0"))
val plan = sql("explain select * from x1 where mac = '2'").collect()(1).getString(0)
assert(plan.contains("idx_x1_mac"))
assert(!plan.contains("idx_x1_mac1"))
checkAnswer(sql("select count(*) from x1 where mac = '2'"), Row(1))
sql("DROP TABLE IF EXISTS x1")
mock.tearDown();
}

/*
* mv indexSchema does not support running here, now must run in mv project.
test("Creating a mv indexSchema,then table rename") {
@@ -155,6 +199,7 @@ class TestRenameTableWithIndex extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS carbon_table")
sql("DROP TABLE IF EXISTS carbon_tb")
sql("DROP TABLE IF EXISTS fact_table1")
sql("DROP TABLE IF EXISTS x1")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT)

0 comments on commit f1da9e8

Please sign in to comment.