From dc6e0666445717cef935feec8159a019b1dfd0c9 Mon Sep 17 00:00:00 2001 From: akashrn5 Date: Thu, 27 Dec 2018 11:31:44 +0530 Subject: [PATCH] update the schema to session catalog after add column, drop column and column rename --- .../sql/hive/CarbonInMemorySessionState.scala | 15 +++--- .../spark/sql/hive/CarbonSessionState.scala | 49 ++++++++----------- .../spark/sql/hive/CarbonSessionUtil.scala | 35 +++++++++++++ .../CarbonAlterTableAddColumnCommand.scala | 8 ++- ...rTableColRenameDataTypeChangeCommand.scala | 24 +++++++-- .../CarbonAlterTableDropColumnCommand.scala | 6 ++- .../spark/sql/hive/CarbonSessionCatalog.scala | 26 +++++++--- .../spark/sql/hive/CarbonSessionState.scala | 31 ++---------- 8 files changed, 117 insertions(+), 77 deletions(-) diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala index ba6aae525ee..da60fb05925 100644 --- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala +++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.parser.CarbonSparkSqlParser import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema => ColumnSchema} import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.format.TableInfo @@ -79,15 +80,13 @@ class InMemorySessionCatalog( override def alterTable(tableIdentifier: TableIdentifier, schemaParts: String, - cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) - : Unit = { + cols: Option[Seq[ColumnSchema]]): Unit = { // NOt Required in case of In-memory catalog } override def alterAddColumns(tableIdentifier: TableIdentifier, schemaParts: String, - newColumns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) - : Unit = { + newColumns: Option[Seq[ColumnSchema]]): Unit = { val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier) val structType = catalogTable.schema var newStructType = structType @@ -101,8 +100,7 @@ class InMemorySessionCatalog( override def alterDropColumns(tableIdentifier: TableIdentifier, schemaParts: String, - dropCols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) - : Unit = { + dropCols: Option[Seq[ColumnSchema]]): Unit = { val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier) val fields = catalogTable.schema.fields.filterNot { field => dropCols.get.exists { col => @@ -112,10 +110,9 @@ class InMemorySessionCatalog( alterSchema(new StructType(fields), catalogTable, tableIdentifier) } - override def alterColumnChangeDataType(tableIdentifier: TableIdentifier, + override def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier, schemaParts: String, - columns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) - : Unit = { + columns: Option[Seq[ColumnSchema]]): Unit = { val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier) val a = catalogTable.schema.fields.flatMap { field => columns.get.map { col => diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala index f3168d7e0e4..f502a6cef99 100644 --- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.internal.{SQLConf, SessionState} import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule} import org.apache.spark.sql.parser.CarbonSparkSqlParser +import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema => ColumnSchema} import org.apache.carbondata.spark.util.CarbonScalaUtil /** @@ -105,47 +106,37 @@ class CarbonHiveSessionCatalog( .asInstanceOf[HiveExternalCatalog].client } - def alterTableRename(oldTableIdentifier: TableIdentifier, - newTableIdentifier: TableIdentifier, - newTablePath: String): Unit = { - getClient().runSqlHive( - s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " + - s"RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }") - getClient().runSqlHive( - s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table} " + - s"SET SERDEPROPERTIES" + - s"('tableName'='${ newTableIdentifier.table }', " + - s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')") - } - - override def alterTable(tableIdentifier: TableIdentifier, - schemaParts: String, - cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) - : Unit = { - getClient() - .runSqlHive(s"ALTER TABLE ${tableIdentifier.database.get}.${ tableIdentifier.table } " + - s"SET TBLPROPERTIES(${ schemaParts })") - } - override def alterAddColumns(tableIdentifier: TableIdentifier, schemaParts: String, - cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) - : Unit = { + cols: Option[Seq[ColumnSchema]]): Unit = { alterTable(tableIdentifier, schemaParts, cols) + CarbonSessionUtil + .alterExternalCatalogForTableWithUpdatedSchema(tableIdentifier, + cols, + schemaParts, + sparkSession) } override def alterDropColumns(tableIdentifier: TableIdentifier, schemaParts: String, - cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) - : Unit = { + cols: Option[Seq[ColumnSchema]]): Unit = { alterTable(tableIdentifier, schemaParts, cols) + CarbonSessionUtil + .alterExternalCatalogForTableWithUpdatedSchema(tableIdentifier, + cols, + schemaParts, + sparkSession) } - override def alterColumnChangeDataType(tableIdentifier: TableIdentifier, + override def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier, schemaParts: String, - cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) - : Unit = { + cols: Option[Seq[ColumnSchema]]): Unit = { alterTable(tableIdentifier, schemaParts, cols) + CarbonSessionUtil + .alterExternalCatalogForTableWithUpdatedSchema(tableIdentifier, + cols, + schemaParts, + sparkSession) } override def createPartitions( diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala index 1a22e99d536..3fee36673db 100644 --- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala +++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala @@ -26,6 +26,11 @@ import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSes import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.util.SparkTypeConverter + +import scala.collection.mutable.ArrayBuffer /** * This class refresh the relation from cache if the carbontable in @@ -93,4 +98,34 @@ object CarbonSessionUtil { ) } + /** + * This method alter the table for datatype change or column rename operation, and update the + * external catalog directly + * + * @param tableIdentifier tableIdentifier for table + * @param cols all the column of table, which are updated with datatype change of + * new column name + * @param schemaParts schemaParts + * @param sparkSession sparkSession + */ + def alterExternalCatalogForTableWithUpdatedSchema(tableIdentifier: TableIdentifier, + cols: Option[Seq[ColumnSchema]], + schemaParts: String, + sparkSession: SparkSession): Unit = { + val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession) + val colArray: scala.collection.mutable.ArrayBuffer[StructField] = ArrayBuffer() + cols.get.foreach(column => + if (!column.isInvisible) { + colArray += StructField(column.getColumnName, + SparkTypeConverter + .convertCarbonToSparkDataType(column, + carbonTable)) + } + ) + sparkSession.sessionState.catalog.externalCatalog + .alterTableDataSchema(tableIdentifier.database.get, + tableIdentifier.table, + StructType(colArray)) + } + } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala index 9c92614549e..d4e16c5bf09 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala @@ -93,11 +93,17 @@ private[sql] case class CarbonAlterTableAddColumnCommand( schemaEvolutionEntry.setAdded(newCols.toList.asJava) val thriftTable = schemaConverter .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) + // carbon columns based on schema order + val carbonColumns = carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala + .collect { case carbonColumn => carbonColumn.getColumnSchema } + .filter(!_.isInvisible) + // sort the new columns based on schema order + val sortedColsBasedActualSchemaOrder = newCols.sortBy(a => a.getSchemaOrdinal) val (tableIdentifier, schemaParts, cols) = AlterTableUtil.updateSchemaInfo( carbonTable, schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry), thriftTable, - Some(newCols))(sparkSession) + Some(carbonColumns ++ sortedColsBasedActualSchemaOrder))(sparkSession) sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].alterAddColumns( tableIdentifier, schemaParts, cols) sparkSession.catalog.refreshTable(tableIdentifier.quotedString) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala index 49d5093d656..e20f30fa3da 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala @@ -214,7 +214,8 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand( carbonTable, tableInfo, addColumnSchema, - schemaEvolutionEntry) + schemaEvolutionEntry, + oldCarbonColumn.head) val alterTableColRenameAndDataTypeChangePostEvent : AlterTableColRenameAndDataTypeChangePostEvent = AlterTableColRenameAndDataTypeChangePostEvent(sparkSession, carbonTable, @@ -262,13 +263,26 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand( carbonTable: CarbonTable, tableInfo: TableInfo, addColumnSchema: ColumnSchema, - schemaEvolutionEntry: SchemaEvolutionEntry): Unit = { + schemaEvolutionEntry: SchemaEvolutionEntry, + oldCarbonColumn: CarbonColumn): Unit = { val schemaConverter = new ThriftWrapperSchemaConverterImpl - val a = List(schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema)) + // get the carbon column in schema order + val carbonColumns = carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala + .filter(!_.isInvisible).collect{case carbonColumn => carbonColumn.getColumnSchema} + // get the schema ordinal of the column for which the datatype changed or column is renamed + val schemaOrdinal = carbonColumns.collect { + case columnSchema if columnSchema.getColumnName + .equalsIgnoreCase(oldCarbonColumn.getColName) => carbonColumns.indexOf(columnSchema)}.head + // update the schema changed column at the specific index in carbonColumns based on schemaorder + carbonColumns + .update(schemaOrdinal, schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema)) val (tableIdentifier, schemaParts, cols) = AlterTableUtil.updateSchemaInfo( - carbonTable, schemaEvolutionEntry, tableInfo, Some(a))(sparkSession) + carbonTable, + schemaEvolutionEntry, + tableInfo, + Some(carbonColumns))(sparkSession) sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog] - .alterColumnChangeDataType(tableIdentifier, schemaParts, cols) + .alterColumnChangeDataTypeOrRename(tableIdentifier, schemaParts, cols) sparkSession.catalog.refreshTable(tableIdentifier.quotedString) } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala index 9ef6fd8c858..80eba510b0b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala @@ -142,7 +142,11 @@ private[sql] case class CarbonAlterTableDropColumnCommand( carbonTable, schemaEvolutionEntry, tableInfo, - Some(delCols))(sparkSession) + // get the columns in schema order and filter the dropped column in the column set + Some(carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala + .filter(!_.isInvisible).collect { + case carbonColumn => carbonColumn.getColumnSchema + }.filterNot(column => delCols.contains(column))))(sparkSession) sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog] .alterDropColumns(tableIdentifier, schemaParts, cols) sparkSession.catalog.refreshTable(tableIdentifier.quotedString) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala index f00739e4ed7..20d43df0670 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability} +import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema => ColumnSchema} /** * This interface defines those common api used by carbon for spark-2.1 and spark-2.2 integration, @@ -77,7 +78,16 @@ trait CarbonSessionCatalog { */ def alterTableRename(oldTableIdentifier: TableIdentifier, newTableIdentifier: TableIdentifier, - newTablePath: String): Unit + newTablePath: String): Unit = { + getClient().runSqlHive( + s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " + + s"RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }") + getClient().runSqlHive( + s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table } " + + s"SET SERDEPROPERTIES" + + s"('tableName'='${ newTableIdentifier.table }', " + + s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')") + } /** * Below method will be used to update serd properties @@ -87,7 +97,11 @@ trait CarbonSessionCatalog { */ def alterTable(tableIdentifier: TableIdentifier, schemaParts: String, - cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit + cols: Option[Seq[ColumnSchema]]): Unit = { + getClient() + .runSqlHive(s"ALTER TABLE ${ tableIdentifier.database.get }.${ tableIdentifier.table } " + + s"SET TBLPROPERTIES(${ schemaParts })") + } /** * Below method will be used to add new column @@ -97,7 +111,7 @@ trait CarbonSessionCatalog { */ def alterAddColumns(tableIdentifier: TableIdentifier, schemaParts: String, - cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit + cols: Option[Seq[ColumnSchema]]): Unit /** * Below method will be used to drop column @@ -107,7 +121,7 @@ trait CarbonSessionCatalog { */ def alterDropColumns(tableIdentifier: TableIdentifier, schemaParts: String, - cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit + cols: Option[Seq[ColumnSchema]]): Unit /** * Below method will be used to alter data type of column in schema @@ -115,7 +129,7 @@ trait CarbonSessionCatalog { * @param schemaParts schema parts * @param cols cols */ - def alterColumnChangeDataType(tableIdentifier: TableIdentifier, + def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier, schemaParts: String, - cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit + cols: Option[Seq[ColumnSchema]]): Unit } diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala index 47b6b671519..47feae05516 100644 --- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, Experime import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema => ColumnSchema} import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.util.CarbonScalaUtil @@ -85,45 +86,23 @@ class CarbonHiveSessionCatalog( carbonEnv } - def alterTableRename(oldTableIdentifier: TableIdentifier, - newTableIdentifier: TableIdentifier, - newTablePath: String): Unit = { - getClient().runSqlHive( - s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table }" + - s" RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }") - getClient().runSqlHive( - s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }" + - s" SET SERDEPROPERTIES" + - s"('tableName'='${ newTableIdentifier.table }', " + - s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')") - } - - def alterTable(tableIdentifier: TableIdentifier, - schemaParts: String, - cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) - : Unit = { - getClient() - .runSqlHive(s"ALTER TABLE ${tableIdentifier.database.get}.${tableIdentifier.table } " + - s"SET TBLPROPERTIES(${ schemaParts })") - } - def alterAddColumns(tableIdentifier: TableIdentifier, schemaParts: String, - cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) + cols: Option[Seq[ColumnSchema]]) : Unit = { alterTable(tableIdentifier, schemaParts, cols) } def alterDropColumns(tableIdentifier: TableIdentifier, schemaParts: String, - cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) + cols: Option[Seq[ColumnSchema]]) : Unit = { alterTable(tableIdentifier, schemaParts, cols) } - def alterColumnChangeDataType(tableIdentifier: TableIdentifier, + def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier, schemaParts: String, - cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) + cols: Option[Seq[ColumnSchema]]) : Unit = { alterTable(tableIdentifier, schemaParts, cols) }