Skip to content

Commit

Permalink
Merge e3c451b into d5a2c69
Browse files Browse the repository at this point in the history
  • Loading branch information
akashrn5 committed Dec 27, 2018
2 parents d5a2c69 + e3c451b commit 71a6d0c
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 77 deletions.
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.parser.CarbonSparkSqlParser
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{CarbonEnv, SparkSession}

import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema => ColumnSchema}
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.format.TableInfo
Expand Down Expand Up @@ -79,15 +80,13 @@ class InMemorySessionCatalog(

override def alterTable(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
: Unit = {
cols: Option[Seq[ColumnSchema]]): Unit = {
// NOt Required in case of In-memory catalog
}

override def alterAddColumns(tableIdentifier: TableIdentifier,
schemaParts: String,
newColumns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
: Unit = {
newColumns: Option[Seq[ColumnSchema]]): Unit = {
val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
val structType = catalogTable.schema
var newStructType = structType
Expand All @@ -101,8 +100,7 @@ class InMemorySessionCatalog(

override def alterDropColumns(tableIdentifier: TableIdentifier,
schemaParts: String,
dropCols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
: Unit = {
dropCols: Option[Seq[ColumnSchema]]): Unit = {
val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
val fields = catalogTable.schema.fields.filterNot { field =>
dropCols.get.exists { col =>
Expand All @@ -112,10 +110,9 @@ class InMemorySessionCatalog(
alterSchema(new StructType(fields), catalogTable, tableIdentifier)
}

override def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
override def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
schemaParts: String,
columns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
: Unit = {
columns: Option[Seq[ColumnSchema]]): Unit = {
val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
val a = catalogTable.schema.fields.flatMap { field =>
columns.get.map { col =>
Expand Down
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.internal.{SQLConf, SessionState}
import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
import org.apache.spark.sql.parser.CarbonSparkSqlParser

import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema => ColumnSchema}
import org.apache.carbondata.spark.util.CarbonScalaUtil

/**
Expand Down Expand Up @@ -105,47 +106,37 @@ class CarbonHiveSessionCatalog(
.asInstanceOf[HiveExternalCatalog].client
}

def alterTableRename(oldTableIdentifier: TableIdentifier,
newTableIdentifier: TableIdentifier,
newTablePath: String): Unit = {
getClient().runSqlHive(
s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " +
s"RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }")
getClient().runSqlHive(
s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table} " +
s"SET SERDEPROPERTIES" +
s"('tableName'='${ newTableIdentifier.table }', " +
s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
}

override def alterTable(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
: Unit = {
getClient()
.runSqlHive(s"ALTER TABLE ${tableIdentifier.database.get}.${ tableIdentifier.table } " +
s"SET TBLPROPERTIES(${ schemaParts })")
}

override def alterAddColumns(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
: Unit = {
cols: Option[Seq[ColumnSchema]]): Unit = {
alterTable(tableIdentifier, schemaParts, cols)
CarbonSessionUtil
.alterExternalCatalogForTableWithUpdatedSchema(tableIdentifier,
cols,
schemaParts,
sparkSession)
}

override def alterDropColumns(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
: Unit = {
cols: Option[Seq[ColumnSchema]]): Unit = {
alterTable(tableIdentifier, schemaParts, cols)
CarbonSessionUtil
.alterExternalCatalogForTableWithUpdatedSchema(tableIdentifier,
cols,
schemaParts,
sparkSession)
}

override def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
override def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
: Unit = {
cols: Option[Seq[ColumnSchema]]): Unit = {
alterTable(tableIdentifier, schemaParts, cols)
CarbonSessionUtil
.alterExternalCatalogForTableWithUpdatedSchema(tableIdentifier,
cols,
schemaParts,
sparkSession)
}

override def createPartitions(
Expand Down
Expand Up @@ -26,6 +26,11 @@ import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSes
import org.apache.spark.util.CarbonReflectionUtils

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.SparkTypeConverter

import scala.collection.mutable.ArrayBuffer

/**
* This class refresh the relation from cache if the carbontable in
Expand Down Expand Up @@ -93,4 +98,34 @@ object CarbonSessionUtil {
)
}

/**
* This method alter the table for datatype change or column rename operation, and update the
* external catalog directly
*
* @param tableIdentifier tableIdentifier for table
* @param cols all the column of table, which are updated with datatype change of
* new column name
* @param schemaParts schemaParts
* @param sparkSession sparkSession
*/
def alterExternalCatalogForTableWithUpdatedSchema(tableIdentifier: TableIdentifier,
cols: Option[Seq[ColumnSchema]],
schemaParts: String,
sparkSession: SparkSession): Unit = {
val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
val colArray: scala.collection.mutable.ArrayBuffer[StructField] = ArrayBuffer()
cols.get.foreach(column =>
if (!column.isInvisible) {
colArray += StructField(column.getColumnName,
SparkTypeConverter
.convertCarbonToSparkDataType(column,
carbonTable))
}
)
sparkSession.sessionState.catalog.externalCatalog
.alterTableDataSchema(tableIdentifier.database.get,
tableIdentifier.table,
StructType(colArray))
}

}
Expand Up @@ -30,6 +30,7 @@ import org.apache.carbondata.core.features.TableOperation
import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.events.{AlterTableAddColumnPostEvent, AlterTableAddColumnPreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.format.TableInfo
import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropColumnRDD}
Expand Down Expand Up @@ -93,11 +94,14 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
schemaEvolutionEntry.setAdded(newCols.toList.asJava)
val thriftTable = schemaConverter
.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
val carbonColumns = carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala
.collect { case carbonColumn => carbonColumn.getColumnSchema }
.filter(!_.isInvisible)
val (tableIdentifier, schemaParts, cols) = AlterTableUtil.updateSchemaInfo(
carbonTable,
schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
thriftTable,
Some(newCols))(sparkSession)
Some(carbonColumns ++ newCols))(sparkSession)
sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].alterAddColumns(
tableIdentifier, schemaParts, cols)
sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
Expand Down
Expand Up @@ -214,7 +214,8 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
carbonTable,
tableInfo,
addColumnSchema,
schemaEvolutionEntry)
schemaEvolutionEntry,
oldCarbonColumn.head)
val alterTableColRenameAndDataTypeChangePostEvent
: AlterTableColRenameAndDataTypeChangePostEvent =
AlterTableColRenameAndDataTypeChangePostEvent(sparkSession, carbonTable,
Expand Down Expand Up @@ -262,13 +263,24 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
carbonTable: CarbonTable,
tableInfo: TableInfo,
addColumnSchema: ColumnSchema,
schemaEvolutionEntry: SchemaEvolutionEntry): Unit = {
schemaEvolutionEntry: SchemaEvolutionEntry,
oldCarbonColumn: CarbonColumn): Unit = {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
val a = List(schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema))
val carbonColumns = carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala
.filter(!_.isInvisible).collect{case carbonColumn => carbonColumn.getColumnSchema}
val schemaOrdinal = carbonColumns.collect {
case columnSchema if columnSchema.getColumnName
.equalsIgnoreCase(oldCarbonColumn.getColName) => carbonColumns.indexOf(columnSchema)
}.head
carbonColumns
.update(schemaOrdinal, schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema))
val (tableIdentifier, schemaParts, cols) = AlterTableUtil.updateSchemaInfo(
carbonTable, schemaEvolutionEntry, tableInfo, Some(a))(sparkSession)
carbonTable,
schemaEvolutionEntry,
tableInfo,
Some(carbonColumns))(sparkSession)
sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
.alterColumnChangeDataType(tableIdentifier, schemaParts, cols)
.alterColumnChangeDataTypeOrRename(tableIdentifier, schemaParts, cols)
sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
}

Expand Down
Expand Up @@ -142,7 +142,10 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
carbonTable,
schemaEvolutionEntry,
tableInfo,
Some(delCols))(sparkSession)
Some(carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala
.filter(!_.isInvisible).collect {
case carbonColumn => carbonColumn.getColumnSchema
}.filterNot(column => delCols.contains(column))))(sparkSession)
sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
.alterDropColumns(tableIdentifier, schemaParts, cols)
sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
Expand Down
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable
import org.apache.spark.sql.catalyst.expressions.Expression

import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability}
import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema => ColumnSchema}

/**
* This interface defines those common api used by carbon for spark-2.1 and spark-2.2 integration,
Expand Down Expand Up @@ -77,7 +78,16 @@ trait CarbonSessionCatalog {
*/
def alterTableRename(oldTableIdentifier: TableIdentifier,
newTableIdentifier: TableIdentifier,
newTablePath: String): Unit
newTablePath: String): Unit = {
getClient().runSqlHive(
s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " +
s"RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }")
getClient().runSqlHive(
s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table } " +
s"SET SERDEPROPERTIES" +
s"('tableName'='${ newTableIdentifier.table }', " +
s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
}

/**
* Below method will be used to update serd properties
Expand All @@ -87,7 +97,11 @@ trait CarbonSessionCatalog {
*/
def alterTable(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit
cols: Option[Seq[ColumnSchema]]): Unit = {
getClient()
.runSqlHive(s"ALTER TABLE ${ tableIdentifier.database.get }.${ tableIdentifier.table } " +
s"SET TBLPROPERTIES(${ schemaParts })")
}

/**
* Below method will be used to add new column
Expand All @@ -97,7 +111,7 @@ trait CarbonSessionCatalog {
*/
def alterAddColumns(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit
cols: Option[Seq[ColumnSchema]]): Unit

/**
* Below method will be used to drop column
Expand All @@ -107,15 +121,15 @@ trait CarbonSessionCatalog {
*/
def alterDropColumns(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit
cols: Option[Seq[ColumnSchema]]): Unit

/**
* Below method will be used to alter data type of column in schema
* @param tableIdentifier table identifier
* @param schemaParts schema parts
* @param cols cols
*/
def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit
cols: Option[Seq[ColumnSchema]]): Unit
}
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, Experime
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema => ColumnSchema}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.util.CarbonScalaUtil

Expand Down Expand Up @@ -85,45 +86,23 @@ class CarbonHiveSessionCatalog(
carbonEnv
}

def alterTableRename(oldTableIdentifier: TableIdentifier,
newTableIdentifier: TableIdentifier,
newTablePath: String): Unit = {
getClient().runSqlHive(
s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table }" +
s" RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }")
getClient().runSqlHive(
s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }" +
s" SET SERDEPROPERTIES" +
s"('tableName'='${ newTableIdentifier.table }', " +
s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
}

def alterTable(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
: Unit = {
getClient()
.runSqlHive(s"ALTER TABLE ${tableIdentifier.database.get}.${tableIdentifier.table } " +
s"SET TBLPROPERTIES(${ schemaParts })")
}

def alterAddColumns(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
cols: Option[Seq[ColumnSchema]])
: Unit = {
alterTable(tableIdentifier, schemaParts, cols)
}

def alterDropColumns(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
cols: Option[Seq[ColumnSchema]])
: Unit = {
alterTable(tableIdentifier, schemaParts, cols)
}

def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
schemaParts: String,
cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
cols: Option[Seq[ColumnSchema]])
: Unit = {
alterTable(tableIdentifier, schemaParts, cols)
}
Expand Down

0 comments on commit 71a6d0c

Please sign in to comment.