Skip to content
Permalink
Browse files
[CARBONDATA-4344] Create MV fails with "LOCAL_DICTIONARY_INCLUDE/LOCA…
…L _DICTIONARY_EXCLUDE column: does not exist in table. Please check the DDL" error

Why is this PR needed?
Create MV fails with "LOCAL_DICTIONARY_INCLUDE/LOCAL _DICTIONARY_EXCLUDE column: does not exist in table.
Please check the DDL" error.
Error occurs only in this scenario: Create Table --> Load --> Alter Add Columns --> Drop table --> Refresh Table --> Create MV
and not in direct scenario like: Create Table --> Load --> Alter Add Columns --> Create MV

What changes were proposed in this PR?
1. After add column command, LOCAL_DICTIONARY_INCLUDE and LOCAL_DICTIONARY_EXCLUDE properties
   are added to the table even if the columns are empty. So, when MV is created next as
   LOCAL_DICTIONARY_EXCLUDE column is defined it tries to access its columns and fails.
   --> Added empty check before adding properties to the table to resolve this.
2. In a direct scenario after add column, the schema gets updated in catalog table but
   the table properties are not updated. Made changes to update table properties to catalog table.

Does this PR introduce any user interface change?
No

Is any new testcase added?
Yes

This closes #4282
  • Loading branch information
ShreelekhyaG authored and Indhumathi27 committed Jun 27, 2022
1 parent 93b0af2 commit 858afc7eb60508de1f9d5fc8df06099e83df3c15
Showing 7 changed files with 78 additions and 83 deletions.
@@ -16,9 +16,6 @@
*/
package org.apache.carbondata.spark.testsuite.secondaryindex

import java.io.{File, IOException}

import org.apache.commons.io.FileUtils
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.test.TestQueryExecutor
import org.apache.spark.sql.test.util.QueryTest
@@ -35,29 +32,6 @@ class TestRegisterIndexCarbonTable extends QueryTest with BeforeAndAfterAll {
sql("drop database if exists carbon cascade")
}

private def restoreData(dblocation: String, tableName: String) = {
val destination = dblocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
val source = dblocation + "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName
try {
FileUtils.copyDirectory(new File(source), new File(destination))
FileUtils.deleteDirectory(new File(source))
} catch {
case e : Exception =>
throw new IOException("carbon table data restore failed.")
} finally {

}
}
private def backUpData(dblocation: String, tableName: String) = {
val source = dblocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
val destination = dblocation + "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName
try {
FileUtils.copyDirectory(new File(source), new File(destination))
} catch {
case e : Exception =>
throw new IOException("carbon table data backup failed.")
}
}
test("register tables test") {
val location = TestQueryExecutor.warehouse +
CarbonCommonConstants.FILE_SEPARATOR + "dbName"
@@ -68,8 +42,8 @@ class TestRegisterIndexCarbonTable extends QueryTest with BeforeAndAfterAll {
"c1 string,c2 int,c3 string,c5 string) STORED AS carbondata")
sql("insert into carbontable select 'a',1,'aa','aaa'")
sql("create index index_on_c3 on table carbontable (c3, c5) AS 'carbondata'")
backUpData(location, "carbontable")
backUpData(location, "index_on_c3")
backUpData(location, None, "carbontable")
backUpData(location, None, "index_on_c3")
sql("drop table carbontable")
restoreData(location, "carbontable")
restoreData(location, "index_on_c3")
@@ -514,10 +514,14 @@ class AlterTableColumnSchemaGenerator(

// The Final Map should contain the combined Local Dictionary Include and
// Local Dictionary Exclude Columns from both Main table and Alter table
tablePropertiesMap
.put(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE, localDictionaryIncludeColumn.toString())
tablePropertiesMap
.put(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE, localDictionaryExcludeColumn.toString())
if (localDictionaryIncludeColumn.toString().nonEmpty) {
tablePropertiesMap.put(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE,
localDictionaryIncludeColumn.toString())
}
if (localDictionaryExcludeColumn.toString().nonEmpty) {
tablePropertiesMap.put(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE,
localDictionaryExcludeColumn.toString())
}
// This part will create dictionary file for all newly added dictionary columns
// if valid default value is provided,
// then that value will be included while creating dictionary file
@@ -173,6 +173,10 @@ object CarbonSessionUtil {
.alterTableDataSchema(tableIdentifier.database.get,
tableIdentifier.table,
StructType(colArray))
// Updates the table properties in catalog table.
CarbonSessionCatalogUtil.alterTableProperties(
sparkSession, tableIdentifier,
carbonTable.getTableInfo.getFactTable.getTableProperties.asScala.toMap, Seq.empty)
}

def updateCachedPlan(plan: LogicalPlan): LogicalPlan = {
@@ -17,10 +17,12 @@

package org.apache.spark.sql.test.util

import java.io.{File, IOException}
import java.util.{Locale, TimeZone}

import scala.collection.JavaConverters._

import org.apache.commons.io.FileUtils
import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapter, DataFrame, Row, SQLContext}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.util._
@@ -221,6 +223,31 @@ class QueryTest extends PlanTest {
}
}
}

def restoreData(dblocation: String, tableName: String): Unit = {
val destination = dblocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
val source = dblocation + "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName
try {
FileUtils.copyDirectory(new File(source), new File(destination))
FileUtils.deleteDirectory(new File(source))
} catch {
case e: Exception =>
throw new IOException("carbon table data restore failed.")
} finally {

}
}

def backUpData(dblocation: String, database: Option[String], tableName: String): Unit = {
val source = CarbonEnv.getTablePath(database, tableName)(sqlContext.sparkSession)
val destination = dblocation + "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName
try {
FileUtils.copyDirectory(new File(source), new File(destination))
} catch {
case e: Exception =>
throw new IOException("carbon table data backup failed.")
}
}
}

object QueryTest {
@@ -405,7 +405,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
val partitions = sql("show partitions restorepartition").collect()
val table = CarbonMetadata.getInstance().getCarbonTable("default_restorepartition")
val dblocation = table.getTablePath.substring(0, table.getTablePath.lastIndexOf("/"))
backUpData(dblocation, "restorepartition")
backUpData(dblocation, None, "restorepartition")
sql("drop table restorepartition")
if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore.isReadFromHiveMetaStore) {
restoreData(dblocation, "restorepartition")
@@ -741,31 +741,6 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
(Strings.formatSize(dataSize.toFloat), Strings.formatSize(indexSize.toFloat))
}

private def restoreData(dblocation: String, tableName: String) = {
val destination = dblocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
val source = dblocation + "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName
try {
FileUtils.copyDirectory(new File(source), new File(destination))
FileUtils.deleteDirectory(new File(source))
} catch {
case e : Exception =>
throw new IOException("carbon table data restore failed.")
} finally {

}
}

private def backUpData(dblocation: String, tableName: String) = {
val source = dblocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
val destination = dblocation + "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName
try {
FileUtils.copyDirectory(new File(source), new File(destination))
} catch {
case e : Exception =>
throw new IOException("carbon table data backup failed.", e)
}
}


override def afterAll: Unit = {
CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct",
@@ -27,6 +27,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.common.exceptions.sql.MalformedMVCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.spark.exception.ProcessMetaDataException
@@ -1556,6 +1557,41 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
assert(TestUtil.verifyMVHit(df.queryExecution.optimizedPlan, "decimal_mv"))
}

test("test create MV after alter add column, drop table and refresh") {
sql("drop table if exists source1")
sql("CREATE table source1 (empno int, empname String, " +
"designation String, doj Timestamp, workgroupcategory int, " +
"workgroupcategoryname String, deptno int, deptname String, projectcode int, " +
"projectjoindate Timestamp, projectenddate Timestamp, attendance int, " +
"utilization int,salary int) STORED AS CARBONDATA")
sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO " +
"TABLE source1 OPTIONS('DELIMITER'=',', 'QUOTECHAR'='\"', " +
"'BAD_RECORDS_LOGGER_ENABLE'='FALSE', 'BAD_RECORDS_ACTION'='FORCE')")
sql("alter table source1 add columns(a int, b string) tblproperties " +
"('LONG_STRING_COLUMNS'='b')")
val table = CarbonMetadata.getInstance().getCarbonTable("default_source1")
val dblocation = table.getTablePath.substring(0, table.getTablePath.lastIndexOf("/"))
sql("drop MATERIALIZED VIEW if exists uniq2_mv")
sql("create MATERIALIZED VIEW uniq2_mv as " +
"select b, sum(empno) from source1 group by b")
val desc = sql("describe formatted uniq2_mv").collect
desc.find(_.get(0).toString.contains("LONG_STRING_COLUMNS")) match {
case Some(row) => assert(row.get(1).toString.contains("source1_b"))
case None => assert(false)
}
backUpData(dblocation, None, "source1")
sql("drop table source1")
if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore.isReadFromHiveMetaStore) {
restoreData(dblocation, "source1")
sql("refresh table source1")
}
sql("drop MATERIALIZED VIEW if exists uniq2_mv")
sql("create MATERIALIZED VIEW uniq2_mv as " +
"select b, sum(empno) from source1 group by b")
sql("drop MATERIALIZED VIEW if exists uniq2_mv")
sql("drop table if exists source1")
}

def copy(oldLoc: String, newLoc: String): Unit = {
val oldFolder = FileFactory.getCarbonFile(oldLoc)
FileFactory.mkdirs(newLoc, FileFactory.getConfiguration)
@@ -38,31 +38,6 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterEach {
sql("set carbon.enable.mv = true")
}

private def restoreData(dblocation: String, tableName: String) = {
val destination = dblocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
val source = dblocation + "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName
try {
FileUtils.copyDirectory(new File(source), new File(destination))
FileUtils.deleteDirectory(new File(source))
} catch {
case e : Exception =>
throw new IOException("carbon table data restore failed.")
} finally {

}
}

private def backUpData(dblocation: String, database: Option[String], tableName: String) = {
val source = CarbonEnv.getTablePath(database, tableName)(sqlContext.sparkSession)
val destination = dblocation + "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName
try {
FileUtils.copyDirectory(new File(source), new File(destination))
} catch {
case e : Exception =>
throw new IOException("carbon table data backup failed.")
}
}

test("register tables test") {
sql(s"create database carbon location '$dbLocation'")
sql("use carbon")

0 comments on commit 858afc7

Please sign in to comment.