Skip to content
Permalink
Browse files
[CARBONDATA-4277] geo instance compatability fix
Why is this PR needed?
The CustomIndex interface extends Serializable and for different
version store, if the serialization id doesn't match, it throws
java.io.InvalidClassException during load/update/query operations.

What changes were proposed in this PR?
As the instance is stored in table properties, made changes to
initialize and update instance while refresh table. Also added
static serialId for the CustomIndex interface.

Does this PR introduce any user interface change?
No

Is any new testcase added?
No, tested in cluster

This closes #4216
  • Loading branch information
ShreelekhyaG authored and Indhumathi27 committed Sep 16, 2021
1 parent 4d8bc9e commit 719935795f2dbc165cca304a47a0b456aac54e8c
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 14 deletions.
@@ -35,6 +35,8 @@
* @param <ReturnType>
*/
public abstract class CustomIndex<ReturnType> implements Serializable {

private static final long serialVersionUID = 6529685098267757692L;
/**
* Initialize the custom index instance.
* @param indexName
@@ -78,6 +78,8 @@ Note:
* `mygeohash` in the above example represent the index name.
* Columns present in spatial_index table properties cannot be altered
i.e., sourcecolumns: `longitude, latitude` and index column: `mygeohash` in the above example.
* To make the spatial instance compatible with previous versions, trigger refresh table command.
In direct upgrade scenario, if spatial table already exists then refresh command fails but updates the instance property in metadata.

#### List of spatial index table properties

@@ -91,6 +91,23 @@ object CarbonParserUtil {
}
}

def initializeSpatialIndexInstance(spatialIndexClassName: String, indexName: String,
tableProperties: mutable.Map[String, String]): Unit = {
val SPATIAL_INDEX_INSTANCE = s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.instance"
try {
val spatialIndexClass : Class[_] = java.lang.Class.forName(spatialIndexClassName)
val instance = spatialIndexClass.newInstance().asInstanceOf[CustomIndex[_]]
instance.init(indexName, tableProperties.asJava)
tableProperties.put(SPATIAL_INDEX_INSTANCE, CustomIndex.getCustomInstance(instance))
} catch {
case ex@(_: ClassNotFoundException | _: InstantiationError | _: IllegalAccessException |
_: ClassCastException) =>
val err = s"Carbon ${ CarbonCommonConstants.SPATIAL_INDEX } property process failed. "
LOGGER.error(err, ex)
throw new MalformedCarbonCommandException(err, ex)
}
}

/**
* The method parses, validates and processes the spatial_index property.
* @param tableProperties Table properties
@@ -170,18 +187,7 @@ object CarbonParserUtil {
s"Unsupported value: ${ spatialIndexType.get } specified for property $TYPE.")
}
}
try {
val spatialIndexClass : Class[_] = java.lang.Class.forName(spatialIndexClassName)
val instance = spatialIndexClass.newInstance().asInstanceOf[CustomIndex[_]]
instance.init(indexName, tableProperties.asJava)
tableProperties.put(SPATIAL_INDEX_INSTANCE, CustomIndex.getCustomInstance(instance))
} catch {
case ex@(_: ClassNotFoundException | _: InstantiationError | _: IllegalAccessException |
_: ClassCastException) =>
val err = s"Carbon ${ CarbonCommonConstants.SPATIAL_INDEX } property process failed. "
LOGGER.error(err, ex)
throw new MalformedCarbonCommandException(err, ex)
}
initializeSpatialIndexInstance(spatialIndexClassName, indexName, tableProperties)
// Insert spatial column as a sort column if it is not already present in it.
CarbonScalaUtil.insertColumnToSortColumns(indexName, sources, tableProperties)
fields += Field(indexName, Some("BigInt"), Some(indexName), Some(null), spatialIndex = true)
@@ -21,12 +21,13 @@ import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.CarbonParserUtil.initializeSpatialIndexInstance
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, MetadataCommand}
import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
import org.apache.spark.util.SparkUtil
import org.apache.spark.util.{AlterTableUtil, SparkUtil}

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -83,6 +84,27 @@ case class RefreshCarbonTableCommand(
if (FileFactory.isFileExist(schemaFilePath)) {
// read TableInfo
val tableInfo = SchemaReader.getTableInfo(identifier)
val tableProperties = tableInfo.getFactTable.getTableProperties
val spatialIndex = CarbonCommonConstants.SPATIAL_INDEX
val indexName = tableProperties.get(spatialIndex)
if (indexName != null) {
val SPATIAL_INDEX_CLASS = s"$spatialIndex.$indexName.class"
val SPATIAL_INDEX_INSTANCE = s"$spatialIndex.$indexName.instance"
// For spatial table, To make the instance compatible with previous versions,
// initialise and update the index instance in table properties.
tableProperties.remove(SPATIAL_INDEX_INSTANCE)
initializeSpatialIndexInstance(tableProperties.get(SPATIAL_INDEX_CLASS),
indexName, tableProperties.asScala)
val tableIdentifier = new TableIdentifier(tableName, Some(tableInfo.getDatabaseName))
if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
// In direct upgrade scenario, if spatial table already exists then on refresh command,
// update the property in metadata and fail table creation.
LOGGER.info(s"Updating $SPATIAL_INDEX_INSTANCE table property on $tableName")
AlterTableUtil.modifyTableProperties(tableIdentifier,
Map(SPATIAL_INDEX_INSTANCE -> tableProperties.get(SPATIAL_INDEX_INSTANCE)),
Seq.empty, true)(sparkSession, sparkSession.sessionState.catalog)
}
}
// remove mv related info from source table properties
tableInfo.getFactTable
.getTableProperties.remove(CarbonCommonConstants.RELATED_MV_TABLES_MAP)
@@ -485,7 +485,9 @@ object AlterTableUtil {
// with the newly added/modified comment since thriftTable also holds comment as its
// direct property.
lowerCasePropertiesMap.foreach { property =>
if (validateTableProperties(property._1)) {
if (validateTableProperties(property._1) ||
(property._1.startsWith(CarbonCommonConstants.SPATIAL_INDEX) &&
property._1.endsWith("instance"))) {
tblPropertiesMap.put(property._1, property._2)
} else {
val errorMessage = "Error: Invalid option(s): " + property._1.toString()

0 comments on commit 7199357

Please sign in to comment.