Skip to content

Commit

Permalink
update the schema to session catalog after add column, drop column an…
Browse files Browse the repository at this point in the history
…d column rename
  • Loading branch information
akashrn5 committed Dec 28, 2018
1 parent 04b5256 commit dc6e066
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.parser.CarbonSparkSqlParser
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{CarbonEnv, SparkSession}

import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema => ColumnSchema}
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.format.TableInfo
Expand Down Expand Up @@ -79,15 +80,13 @@ class InMemorySessionCatalog(

override def alterTable(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
: Unit = {
cols: Option[Seq[ColumnSchema]]): Unit = {
// NOt Required in case of In-memory catalog
}

override def alterAddColumns(tableIdentifier: TableIdentifier,
schemaParts: String,
newColumns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
: Unit = {
newColumns: Option[Seq[ColumnSchema]]): Unit = {
val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
val structType = catalogTable.schema
var newStructType = structType
Expand All @@ -101,8 +100,7 @@ class InMemorySessionCatalog(

override def alterDropColumns(tableIdentifier: TableIdentifier,
schemaParts: String,
dropCols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
: Unit = {
dropCols: Option[Seq[ColumnSchema]]): Unit = {
val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
val fields = catalogTable.schema.fields.filterNot { field =>
dropCols.get.exists { col =>
Expand All @@ -112,10 +110,9 @@ class InMemorySessionCatalog(
alterSchema(new StructType(fields), catalogTable, tableIdentifier)
}

override def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
override def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
schemaParts: String,
columns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
: Unit = {
columns: Option[Seq[ColumnSchema]]): Unit = {
val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
val a = catalogTable.schema.fields.flatMap { field =>
columns.get.map { col =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.internal.{SQLConf, SessionState}
import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
import org.apache.spark.sql.parser.CarbonSparkSqlParser

import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema => ColumnSchema}
import org.apache.carbondata.spark.util.CarbonScalaUtil

/**
Expand Down Expand Up @@ -105,47 +106,37 @@ class CarbonHiveSessionCatalog(
.asInstanceOf[HiveExternalCatalog].client
}

def alterTableRename(oldTableIdentifier: TableIdentifier,
newTableIdentifier: TableIdentifier,
newTablePath: String): Unit = {
getClient().runSqlHive(
s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " +
s"RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }")
getClient().runSqlHive(
s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table} " +
s"SET SERDEPROPERTIES" +
s"('tableName'='${ newTableIdentifier.table }', " +
s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
}

override def alterTable(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
: Unit = {
getClient()
.runSqlHive(s"ALTER TABLE ${tableIdentifier.database.get}.${ tableIdentifier.table } " +
s"SET TBLPROPERTIES(${ schemaParts })")
}

override def alterAddColumns(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
: Unit = {
cols: Option[Seq[ColumnSchema]]): Unit = {
alterTable(tableIdentifier, schemaParts, cols)
CarbonSessionUtil
.alterExternalCatalogForTableWithUpdatedSchema(tableIdentifier,
cols,
schemaParts,
sparkSession)
}

override def alterDropColumns(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
: Unit = {
cols: Option[Seq[ColumnSchema]]): Unit = {
alterTable(tableIdentifier, schemaParts, cols)
CarbonSessionUtil
.alterExternalCatalogForTableWithUpdatedSchema(tableIdentifier,
cols,
schemaParts,
sparkSession)
}

override def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
override def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
: Unit = {
cols: Option[Seq[ColumnSchema]]): Unit = {
alterTable(tableIdentifier, schemaParts, cols)
CarbonSessionUtil
.alterExternalCatalogForTableWithUpdatedSchema(tableIdentifier,
cols,
schemaParts,
sparkSession)
}

override def createPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSes
import org.apache.spark.util.CarbonReflectionUtils

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.SparkTypeConverter

import scala.collection.mutable.ArrayBuffer

/**
* This class refresh the relation from cache if the carbontable in
Expand Down Expand Up @@ -93,4 +98,34 @@ object CarbonSessionUtil {
)
}

/**
* This method alter the table for datatype change or column rename operation, and update the
* external catalog directly
*
* @param tableIdentifier tableIdentifier for table
* @param cols all the column of table, which are updated with datatype change of
* new column name
* @param schemaParts schemaParts
* @param sparkSession sparkSession
*/
def alterExternalCatalogForTableWithUpdatedSchema(tableIdentifier: TableIdentifier,
cols: Option[Seq[ColumnSchema]],
schemaParts: String,
sparkSession: SparkSession): Unit = {
val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
val colArray: scala.collection.mutable.ArrayBuffer[StructField] = ArrayBuffer()
cols.get.foreach(column =>
if (!column.isInvisible) {
colArray += StructField(column.getColumnName,
SparkTypeConverter
.convertCarbonToSparkDataType(column,
carbonTable))
}
)
sparkSession.sessionState.catalog.externalCatalog
.alterTableDataSchema(tableIdentifier.database.get,
tableIdentifier.table,
StructType(colArray))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,17 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
schemaEvolutionEntry.setAdded(newCols.toList.asJava)
val thriftTable = schemaConverter
.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
// carbon columns based on schema order
val carbonColumns = carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala
.collect { case carbonColumn => carbonColumn.getColumnSchema }
.filter(!_.isInvisible)
// sort the new columns based on schema order
val sortedColsBasedActualSchemaOrder = newCols.sortBy(a => a.getSchemaOrdinal)
val (tableIdentifier, schemaParts, cols) = AlterTableUtil.updateSchemaInfo(
carbonTable,
schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
thriftTable,
Some(newCols))(sparkSession)
Some(carbonColumns ++ sortedColsBasedActualSchemaOrder))(sparkSession)
sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].alterAddColumns(
tableIdentifier, schemaParts, cols)
sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
carbonTable,
tableInfo,
addColumnSchema,
schemaEvolutionEntry)
schemaEvolutionEntry,
oldCarbonColumn.head)
val alterTableColRenameAndDataTypeChangePostEvent
: AlterTableColRenameAndDataTypeChangePostEvent =
AlterTableColRenameAndDataTypeChangePostEvent(sparkSession, carbonTable,
Expand Down Expand Up @@ -262,13 +263,26 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
carbonTable: CarbonTable,
tableInfo: TableInfo,
addColumnSchema: ColumnSchema,
schemaEvolutionEntry: SchemaEvolutionEntry): Unit = {
schemaEvolutionEntry: SchemaEvolutionEntry,
oldCarbonColumn: CarbonColumn): Unit = {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
val a = List(schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema))
// get the carbon column in schema order
val carbonColumns = carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala
.filter(!_.isInvisible).collect{case carbonColumn => carbonColumn.getColumnSchema}
// get the schema ordinal of the column for which the datatype changed or column is renamed
val schemaOrdinal = carbonColumns.collect {
case columnSchema if columnSchema.getColumnName
.equalsIgnoreCase(oldCarbonColumn.getColName) => carbonColumns.indexOf(columnSchema)}.head
// update the schema changed column at the specific index in carbonColumns based on schemaorder
carbonColumns
.update(schemaOrdinal, schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema))
val (tableIdentifier, schemaParts, cols) = AlterTableUtil.updateSchemaInfo(
carbonTable, schemaEvolutionEntry, tableInfo, Some(a))(sparkSession)
carbonTable,
schemaEvolutionEntry,
tableInfo,
Some(carbonColumns))(sparkSession)
sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
.alterColumnChangeDataType(tableIdentifier, schemaParts, cols)
.alterColumnChangeDataTypeOrRename(tableIdentifier, schemaParts, cols)
sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
carbonTable,
schemaEvolutionEntry,
tableInfo,
Some(delCols))(sparkSession)
// get the columns in schema order and filter the dropped column in the column set
Some(carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala
.filter(!_.isInvisible).collect {
case carbonColumn => carbonColumn.getColumnSchema
}.filterNot(column => delCols.contains(column))))(sparkSession)
sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
.alterDropColumns(tableIdentifier, schemaParts, cols)
sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable
import org.apache.spark.sql.catalyst.expressions.Expression

import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability}
import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema => ColumnSchema}

/**
* This interface defines those common api used by carbon for spark-2.1 and spark-2.2 integration,
Expand Down Expand Up @@ -77,7 +78,16 @@ trait CarbonSessionCatalog {
*/
def alterTableRename(oldTableIdentifier: TableIdentifier,
newTableIdentifier: TableIdentifier,
newTablePath: String): Unit
newTablePath: String): Unit = {
getClient().runSqlHive(
s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " +
s"RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }")
getClient().runSqlHive(
s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table } " +
s"SET SERDEPROPERTIES" +
s"('tableName'='${ newTableIdentifier.table }', " +
s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
}

/**
* Below method will be used to update serd properties
Expand All @@ -87,7 +97,11 @@ trait CarbonSessionCatalog {
*/
def alterTable(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit
cols: Option[Seq[ColumnSchema]]): Unit = {
getClient()
.runSqlHive(s"ALTER TABLE ${ tableIdentifier.database.get }.${ tableIdentifier.table } " +
s"SET TBLPROPERTIES(${ schemaParts })")
}

/**
* Below method will be used to add new column
Expand All @@ -97,7 +111,7 @@ trait CarbonSessionCatalog {
*/
def alterAddColumns(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit
cols: Option[Seq[ColumnSchema]]): Unit

/**
* Below method will be used to drop column
Expand All @@ -107,15 +121,15 @@ trait CarbonSessionCatalog {
*/
def alterDropColumns(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit
cols: Option[Seq[ColumnSchema]]): Unit

/**
* Below method will be used to alter data type of column in schema
* @param tableIdentifier table identifier
* @param schemaParts schema parts
* @param cols cols
*/
def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit
cols: Option[Seq[ColumnSchema]]): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, Experime
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema => ColumnSchema}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.util.CarbonScalaUtil

Expand Down Expand Up @@ -85,45 +86,23 @@ class CarbonHiveSessionCatalog(
carbonEnv
}

def alterTableRename(oldTableIdentifier: TableIdentifier,
newTableIdentifier: TableIdentifier,
newTablePath: String): Unit = {
getClient().runSqlHive(
s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table }" +
s" RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }")
getClient().runSqlHive(
s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }" +
s" SET SERDEPROPERTIES" +
s"('tableName'='${ newTableIdentifier.table }', " +
s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
}

def alterTable(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
: Unit = {
getClient()
.runSqlHive(s"ALTER TABLE ${tableIdentifier.database.get}.${tableIdentifier.table } " +
s"SET TBLPROPERTIES(${ schemaParts })")
}

def alterAddColumns(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
cols: Option[Seq[ColumnSchema]])
: Unit = {
alterTable(tableIdentifier, schemaParts, cols)
}

def alterDropColumns(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
cols: Option[Seq[ColumnSchema]])
: Unit = {
alterTable(tableIdentifier, schemaParts, cols)
}

def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
cols: Option[Seq[ColumnSchema]])
: Unit = {
alterTable(tableIdentifier, schemaParts, cols)
}
Expand Down

0 comments on commit dc6e066

Please sign in to comment.