Skip to content
Permalink
Browse files
[CARBONDATA-4329] Fix multiple issues with External table
Why is this PR needed?
Issue 1:
When we create external table on transactional table location,
schema file will be present. While creating external table,
which is also transactional, the schema file is overwritten

Issue 2:
If external table is created on a location, where the source table
already exists, on drop external table, it is deleting the table data.
Query on the source table fails

What changes were proposed in this PR?
Avoid writing schema file if table type is external and transactional
Dont drop external table location data, if table_type is external

This closes #4255
  • Loading branch information
Indhumathi27 authored and kunal642 committed Apr 1, 2022
1 parent d6ce946 commit 46b62cf6f79d1d826b498609435337b2ed342bbe
Showing 8 changed files with 147 additions and 16 deletions.
@@ -880,8 +880,7 @@ public boolean isMV() {
* an internal table property set during table creation)
*/
public boolean isExternalTable() {
String external = tableInfo.getFactTable().getTableProperties().get("_external");
return external != null && external.equalsIgnoreCase("true");
return tableInfo.isExternal();
}

public boolean isFileLevelFormat() {
@@ -289,6 +289,10 @@ public boolean isTransactionalTable() {
return isTransactionalTable;
}

public boolean isExternal() {
return Boolean.parseBoolean(factTable.getTableProperties().getOrDefault("_external", "false"));
}

public void setTransactionalTable(boolean transactionalTable) {
isTransactionalTable = transactionalTable;
}
@@ -24,6 +24,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.optimizer.CarbonFilters
@@ -54,17 +55,29 @@ case class CarbonCountStar(
CarbonInputFormat.setQuerySegment(job.getConfiguration, carbonTable)

// get row count
var rowCount = CarbonUpdateUtil.getRowCount(
tableInputFormat.getBlockRowCount(
job,
carbonTable,
CarbonFilters.getPartitions(
Seq.empty,
sparkSession,
TableIdentifier(
carbonTable.getTableName,
Some(carbonTable.getDatabaseName))).map(_.toList.asJava).orNull, false),
carbonTable)
var rowCount = try {
CarbonUpdateUtil.getRowCount(
tableInputFormat.getBlockRowCount(
job,
carbonTable,
CarbonFilters.getPartitions(
Seq.empty,
sparkSession,
TableIdentifier(
carbonTable.getTableName,
Some(carbonTable.getDatabaseName))).map(_.toList.asJava).orNull, false),
carbonTable)
} catch {
case ex: NoSuchTableException =>
if (carbonTable.isExternalTable && carbonTable.isTransactionalTable) {
// In case, external table is created on transactional table location and if the original
// table is dropped, then while trying to read schema, it will error exception, as file
// does not exists. In that case, just return 0
0L
} else {
throw ex
}
}

if (CarbonProperties.isQueryStageInputEnabled) {
// check for number of row for stage input
@@ -425,7 +425,9 @@ object CarbonSource {
*/
def saveCarbonSchemaFile(
metaStore: CarbonMetaStore, ignoreIfExists: Boolean, tableInfo: TableInfo): Unit = {
if (!metaStore.isReadFromHiveMetaStore && tableInfo.isTransactionalTable) {
// if table is external transactional table, do not overwrite schema
if (!metaStore.isReadFromHiveMetaStore && tableInfo.isTransactionalTable &&
!tableInfo.isExternal) {
try {
metaStore.saveToDisk(tableInfo, tableInfo.getTablePath)
} catch {
@@ -152,7 +152,7 @@ case class CarbonDropTableCommand(
// clear driver side index and dictionary cache
if (!EnvHelper.isLegacy(sparkSession)
&& carbonTable != null
&& !(carbonTable.isMV && !dropChildTable)) {
&& !(carbonTable.isMV && !dropChildTable) && !carbonTable.isExternalTable) {
// delete the table folder
CarbonInternalMetastore.deleteTableDirectory(carbonTable)
// Delete lock directory if external lock path is specified.
@@ -319,6 +319,12 @@ class CarbonFileMetastore extends CarbonMetaStore {
schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath)
schemaRefreshTime = FileFactory
.getCarbonFile(tableMetadataFile).getLastModifiedTime
// set external property to table info from catalog table properties
if (parameters.contains("isExternal")) {
wrapperTableInfo.getFactTable
.getTableProperties
.put("_external", parameters("isExternal"))
}
Some(wrapperTableInfo)
} else {
None
@@ -262,7 +262,7 @@ object CarbonSparkSqlParserUtil {
if (provider.equalsIgnoreCase("'carbonfile'")) {
tableInfo.getFactTable.getTableProperties.put("_filelevelformat", "true")
tableInfo.getFactTable.getTableProperties.put("_external", "false")
} else {
} else if (!table.properties.contains("hasexternalkeyword")) {
tableInfo.getFactTable.getTableProperties.put("_external", "true")
tableInfo.getFactTable.getTableProperties.put("_filelevelformat", "false")
}
@@ -27,6 +27,7 @@ import org.scalatest.BeforeAndAfterAll

import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties

class TestCreateExternalTable extends QueryTest with BeforeAndAfterAll {
@@ -229,4 +230,110 @@ class TestCreateExternalTable extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists source")
}

test("test create external table on transactional table location") {
sql("DROP TABLE IF EXISTS table1")
sql("DROP TABLE IF EXISTS table2")
sql("DROP TABLE IF EXISTS table3")
try {
sql("create table table1 (roll string) STORED AS carbondata")
sql("insert into table1 values('abc')")
val table1 = CarbonEnv.getCarbonTable(Some("default"), "table1")(sqlContext.sparkSession)
val lastMdtFileTable1 = FileFactory
.getCarbonFile(FileFactory.getUpdatedFilePath(table1.getTablePath + "/Metadata/schema"))
.getLastModifiedTime
sql(
s"""CREATE EXTERNAL TABLE table2 STORED AS carbondata
| LOCATION
|'${ table1.getTablePath }' """.stripMargin)
val table2 = CarbonEnv.getCarbonTable(Some("default"), "table2")(sqlContext.sparkSession)
val lastMdtFileTable2 = FileFactory
.getCarbonFile(FileFactory.getUpdatedFilePath(table2.getTablePath + "/Metadata/schema"))
.getLastModifiedTime
assert(lastMdtFileTable1 == lastMdtFileTable2)
checkAnswer(sql("select * from table1"), sql("select * from table2"))
// verify insert into table1 and check results of table1 and table2
sql("insert into table1 values('abcd')")
checkAnswer(sql("select * from table1"), sql("select * from table2"))
// verify delete from table1 and check results of table1 and table2
sql("delete from table1 where roll='abc'")
checkAnswer(sql("select * from table1"), sql("select * from table2"))
// verify insert into table2 and check results of table1 and table2
sql("insert into table2 values('abcde')")
checkAnswer(sql("select * from table1"), sql("select * from table2"))
// verify delete from table2 and check results of table1 and table2
sql("delete from table1 where roll='abcde'")
val res = sql("select * from table1")
checkAnswer(sql("select * from table1"), sql("select * from table2"))
// drop table2 and test result of table1
sql("DROP TABLE IF EXISTS table2")
checkAnswer(sql("select count(*) from table1"), Seq(Row(1)))
checkAnswer(sql("select * from table1"), res)
sql(
s"""CREATE EXTERNAL TABLE table3 STORED AS carbondata
| LOCATION
|'${ table1.getTablePath }' """.stripMargin)
checkAnswer(sql("select * from table1"), sql("select * from table3"))
// drop table1 and test result of table3
sql("DROP TABLE IF EXISTS table1")
checkAnswer(sql("select count(*) from table3"), Seq(Row(0)))
} finally {
sql("DROP TABLE IF EXISTS table1")
sql("DROP TABLE IF EXISTS table2")
sql("DROP TABLE IF EXISTS table3")
}
}

test("test create external table on transactional partition table location") {
sql("DROP TABLE IF EXISTS table1")
sql("DROP TABLE IF EXISTS table2")
sql("DROP TABLE IF EXISTS table3")
try {
sql("create table table1 (name string) partitioned by (dept int) STORED AS carbondata")
sql("insert into table1 values('abc', 1)")
val table1 = CarbonEnv.getCarbonTable(Some("default"), "table1")(sqlContext.sparkSession)
val lastMdtFileTable1 = FileFactory
.getCarbonFile(FileFactory.getUpdatedFilePath(table1.getTablePath + "/Metadata/schema"))
.getLastModifiedTime
sql(
s"""CREATE EXTERNAL TABLE table2(name string) partitioned by (dept int) STORED AS carbondata
| LOCATION
|'${ table1.getTablePath }' """.stripMargin)
val table2 = CarbonEnv.getCarbonTable(Some("default"), "table2")(sqlContext.sparkSession)
val lastMdtFileTable2 = FileFactory
.getCarbonFile(FileFactory.getUpdatedFilePath(table2.getTablePath + "/Metadata/schema"))
.getLastModifiedTime
assert(lastMdtFileTable1 == lastMdtFileTable2)
checkAnswer(sql("select * from table1"), sql("select * from table2"))
// verify insert into table1 and check results of table1 and table2
sql("insert into table1 values('abcd', 2)")
checkAnswer(sql("select * from table1"), sql("select * from table2"))
// verify delete from table1 and check results of table1 and table2
sql("delete from table1 where name='abc'")
checkAnswer(sql("select * from table1"), sql("select * from table2"))
// verify insert into table2 and check results of table1 and table2
sql("insert into table2 values('abcde', 2)")
checkAnswer(sql("select * from table1"), sql("select * from table2"))
// verify delete from table2 and check results of table1 and table2
sql("delete from table1 where name='abcde'")
val res = sql("select * from table1")
checkAnswer(sql("select * from table1"), sql("select * from table2"))
// drop table2 and test result of table1
sql("DROP TABLE IF EXISTS table2")
checkAnswer(sql("select count(*) from table1"), Seq(Row(1)))
checkAnswer(sql("select * from table1"), res)
sql(
s"""CREATE EXTERNAL TABLE table3(name string) partitioned by (dept int) STORED AS carbondata
| LOCATION
|'${ table1.getTablePath }' """.stripMargin)
checkAnswer(sql("select * from table1"), sql("select * from table3"))
// drop table1 and test result of table3
sql("DROP TABLE IF EXISTS table1")
checkAnswer(sql("select count(*) from table3"), Seq(Row(0)))
} finally {
sql("DROP TABLE IF EXISTS table1")
sql("DROP TABLE IF EXISTS table2")
sql("DROP TABLE IF EXISTS table3")
}
}

}

0 comments on commit 46b62cf

Please sign in to comment.