Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CARBONDATA-2219] Added validation for external partition location to use same schema. #2018

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,23 +90,22 @@ public void readAllIIndexOfSegment(String segmentPath) throws IOException {
/**
* Read all index files and keep the cache in it.
*
* @param segmentFileStore
* @param segmentFile
* @throws IOException
*/
public void readAllIIndexOfSegment(SegmentFileStore segmentFileStore, SegmentStatus status,
boolean ignoreStatus) throws IOException {
public void readAllIIndexOfSegment(SegmentFileStore.SegmentFile segmentFile, String tablePath,
SegmentStatus status, boolean ignoreStatus) throws IOException {
List<CarbonFile> carbonIndexFiles = new ArrayList<>();
if (segmentFileStore.getLocationMap() == null) {
if (segmentFile == null) {
return;
}
for (Map.Entry<String, SegmentFileStore.FolderDetails> locations : segmentFileStore
for (Map.Entry<String, SegmentFileStore.FolderDetails> locations : segmentFile
.getLocationMap().entrySet()) {
String location = locations.getKey();

if (locations.getValue().getStatus().equals(status.getMessage()) || ignoreStatus) {
if (locations.getValue().isRelative()) {
location =
segmentFileStore.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + location;
location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
}
for (String indexFile : locations.getValue().getFiles()) {
CarbonFile carbonFile = FileFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
Expand Down Expand Up @@ -314,7 +315,7 @@ private void readIndexFiles(SegmentStatus status, boolean ignoreStatus) throws I
}
SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
indexFilesMap = new HashMap<>();
indexFileStore.readAllIIndexOfSegment(this, status, ignoreStatus);
indexFileStore.readAllIIndexOfSegment(this.segmentFile, tablePath, status, ignoreStatus);
Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath();
DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
Expand All @@ -328,6 +329,30 @@ private void readIndexFiles(SegmentStatus status, boolean ignoreStatus) throws I
}
}

/**
* Reads all index files and get the schema of each index file
* @throws IOException
*/
public static Map<String, List<ColumnSchema>> getSchemaFiles(SegmentFile segmentFile,
String tablePath) throws IOException {
Map<String, List<ColumnSchema>> schemaMap = new HashMap<>();
if (segmentFile == null) {
return schemaMap;
}
SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
indexFileStore.readAllIIndexOfSegment(segmentFile, tablePath, SegmentStatus.SUCCESS, true);
Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath();
DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
List<DataFileFooter> indexInfo =
fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue());
if (indexInfo.size() > 0) {
schemaMap.put(entry.getKey(), indexInfo.get(0).getColumnInTable());
}
}
return schemaMap;
}

/**
* Gets all index files from this segment
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.exception.ProcessMetaDataException

object CarbonPartitionExample {

Expand Down Expand Up @@ -195,7 +196,7 @@ object CarbonPartitionExample {
try {
spark.sql("""SHOW PARTITIONS t1""").show(100, false)
} catch {
case ex: AnalysisException => LOGGER.error(ex.getMessage())
case ex: ProcessMetaDataException => LOGGER.error(ex.getMessage())
}
spark.sql("""SHOW PARTITIONS t0""").show(100, false)
spark.sql("""SHOW PARTITIONS t3""").show(100, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,15 +290,17 @@ test("Creation of partition table should fail if the colname in table schema and
val location = metastoredb +"/" +"ravi"
sql(s"""alter table staticpartitionlocload add partition (empname='ravi') location '$location'""")
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocload partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
val frame = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionlocload")
val frame = sql("select count(empno) from staticpartitionlocload")
verifyPartitionInfo(frame, Seq("empname=ravi"))
assert(frame.count() == 10)
checkAnswer(sql("select count(empno) from staticpartitionlocload"), Seq(Row(10)))
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocload partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
checkAnswer(sql("select count(empno) from staticpartitionlocload"), Seq(Row(20)))
val file = FileFactory.getCarbonFile(location)
assert(file.exists())
FileFactory.deleteAllCarbonFilesOfDir(file)
}

test("add external partition with static column partition with load command") {
test("add external partition with static column partition with load command with diffrent schema") {

sql(
"""
Expand All @@ -324,18 +326,43 @@ test("Creation of partition table should fail if the colname in table schema and
| PARTITIONED BY (empname String)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
sql(s"""alter table staticpartitionextlocload add partition (empname='ravi') location '$location'""")
val frame = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionextlocload")
verifyPartitionInfo(frame, Seq("empname=ravi"))
assert(frame.count() == 10)
val location2 = storeLocation +"/staticpartitionlocloadother/empname=indra"
sql(s"""alter table staticpartitionextlocload add partition (empname='indra') location '$location2'""")
val frame1 = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionextlocload")
verifyPartitionInfo(frame1, Seq("empname=indra"))
assert(frame1.count() == 20)
intercept[Exception] {
sql(s"""alter table staticpartitionextlocload add partition (empname='ravi') location '$location'""")
}
assert(sql(s"show partitions staticpartitionextlocload").count() == 0)
val file = FileFactory.getCarbonFile(location)
assert(file.exists())
FileFactory.deleteAllCarbonFilesOfDir(file)
if(file.exists()) {
FileFactory.deleteAllCarbonFilesOfDir(file)
}
}

test("add external partition with static column partition with load command") {

sql(
"""
| CREATE TABLE staticpartitionlocloadother_new (empno int, designation String,
| workgroupcategory int, workgroupcategoryname String, deptno int,
| projectjoindate Timestamp,attendance int,
| deptname String,projectcode int,
| utilization int,salary int,projectenddate Date,doj Timestamp)
| PARTITIONED BY (empname String)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
val location = metastoredb +"/" +"ravi1"
sql(s"""alter table staticpartitionlocloadother_new add partition (empname='ravi') location '$location'""")
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocloadother_new partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocloadother_new partition(empname='indra') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(20)))
sql(s"""ALTER TABLE staticpartitionlocloadother_new DROP PARTITION(empname='ravi')""")
checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(10)))
sql(s"""alter table staticpartitionlocloadother_new add partition (empname='ravi') location '$location'""")
checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(20)))
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocloadother_new partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(30)))
val file = FileFactory.getCarbonFile(location)
if(file.exists()) {
FileFactory.deleteAllCarbonFilesOfDir(file)
}
}

test("drop partition on preAggregate table should fail"){
Expand Down Expand Up @@ -387,6 +414,8 @@ test("Creation of partition table should fail if the colname in table schema and
sql("drop table if exists staticpartitionlocload")
sql("drop table if exists staticpartitionextlocload")
sql("drop table if exists staticpartitionlocloadother")
sql("drop table if exists staticpartitionextlocload_new")
sql("drop table if exists staticpartitionlocloadother_new")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ case class CarbonAlterTableAddHivePartitionCommand(


override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
AlterTableDropPartitionCommand(tableName, partitionSpecsAndLocs.map(_._1), true, false, true)
AlterTableDropPartitionCommand(
tableName,
partitionSpecsAndLocs.map(_._1),
ifExists = true,
purge = false,
retainData = true).run(sparkSession)
val msg = s"Got exception $exception when processing data of add partition." +
"Dropping partitions to the metadata"
LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(msg)
Expand All @@ -88,6 +93,17 @@ case class CarbonAlterTableAddHivePartitionCommand(
val segmentFile = SegmentFileStore.getSegmentFileForPhysicalDataPartitions(table.getTablePath,
partitionSpecsAndLocsTobeAdded)
if (segmentFile != null) {
val indexToSchemas = SegmentFileStore.getSchemaFiles(segmentFile, table.getTablePath)
val tableColums = table.getTableInfo.getFactTable.getListOfColumns.asScala
var isSameSchema = indexToSchemas.asScala.exists{ case(key, columnSchemas) =>
columnSchemas.asScala.exists { col =>
tableColums.exists(p => p.getColumnUniqueId.equals(col.getColumnUniqueId))
} && columnSchemas.size() == tableColums.length
}
if (!isSameSchema) {
throw new UnsupportedOperationException(
"Schema of index files located in location is not matching with current table schema")
}
val loadModel = new CarbonLoadModel
loadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(table))
// Create new entry in tablestatus file
Expand Down