Skip to content

Commit

Permalink
[CARBONDATA-3793]Fix update and delete issue when multiple partition …
Browse files Browse the repository at this point in the history
…columns are present and clean files issue

Why is this PR needed?
1. After #3837 , when there are multiple partition columns are present in table,
update and delete not happening as , it was considering the tuple id as external
segment tuple ID , Because when multiple partitions are present TID contains # character.
2. when multiple segments are present, and some segments are updated, and clean
files delete the segment files of non updated segments considering as stale files.

What changes were proposed in this PR?
1. To double check for external segment, check if the segment metadata details has path is not null.
2. delete the old segment files of only the updated segments.

This closes #3911
  • Loading branch information
akashrn5 authored and kunal642 committed Sep 14, 2020
1 parent 0527b76 commit b9a1398
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 12 deletions.
Expand Up @@ -90,7 +90,9 @@ public static String getSegmentWithBlockFromTID(String Tid, boolean isPartitionT
// in add segment case, it will be in second index as the blockletID is generated by adding the
// complete external path
// this is in case of the external segment, where the tuple id has external path with #
if (Tid.contains("#")) {
// here no need to check for any path present in metadta details, as # can come in tuple id in
// case of multiple partitions, so partition check is already present above.
if (Tid.contains("#/")) {
return getRequiredFieldFromTID(Tid, TupleIdEnum.EXTERNAL_SEGMENT_ID)
+ CarbonCommonConstants.FILE_SEPARATOR + getRequiredFieldFromTID(Tid,
TupleIdEnum.EXTERNAL_BLOCK_ID);
Expand Down Expand Up @@ -620,6 +622,8 @@ public static void cleanUpDeltaFiles(CarbonTable table, boolean forceDelete) thr
CarbonFile segmentFilesLocation =
FileFactory.getCarbonFile(CarbonTablePath.getSegmentFilesLocation(table.getTablePath()));
Set<String> segmentFilesNotToDelete = new HashSet<>();
Set<String> updatedSegmentIDs = new HashSet<>(Arrays.asList(
segmentFilesToBeUpdated.stream().map(Segment::getSegmentNo).toArray(String[]::new)));
for (Segment segment : segmentFilesToBeUpdated) {
SegmentFileStore fileStore =
new SegmentFileStore(table.getTablePath(), segment.getSegmentFileName());
Expand All @@ -636,7 +640,8 @@ public static void cleanUpDeltaFiles(CarbonTable table, boolean forceDelete) thr
CarbonFile[] invalidSegmentFiles = segmentFilesLocation.listFiles(new CarbonFileFilter() {
@Override
public boolean accept(CarbonFile file) {
return !segmentFilesNotToDelete.contains(file.getName());
return !segmentFilesNotToDelete.contains(file.getName()) && updatedSegmentIDs
.contains(CarbonTablePath.DataFileUtil.getSegmentNoFromSegmentFile(file.getName()));
}
});
for (CarbonFile invalidSegmentFile : invalidSegmentFiles) {
Expand Down
Expand Up @@ -563,6 +563,13 @@ public static String getFileName(String dataFilePath) {
}
}

/**
* This method returns the segment number from the segment file name
*/
public static String getSegmentNoFromSegmentFile(String segmentFileName) {
return segmentFileName.split(CarbonCommonConstants.UNDERSCORE)[0];
}

/**
* gets segment id from given absolute data file path
*/
Expand Down
Expand Up @@ -30,11 +30,11 @@
import java.util.List;
import java.util.Map;

import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionColumnPage;
import org.apache.carbondata.core.datastore.filesystem.LocalCarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.datatype.DataType;
Expand Down Expand Up @@ -947,6 +947,12 @@ public void testTupeIDInUpdateScenarios() {
Assert.assertEquals(CarbonUpdateUtil.getSegmentBlockNameKey("0", blockName, true), "0-0_0-0-0-1597412488102");
}

@Test public void testSegmentNumberFromSegmentFile() {
String segmentFileName = "0_1597411003332";
Assert.assertEquals("0",
CarbonTablePath.DataFileUtil.getSegmentNoFromSegmentFile(segmentFileName));
}

private String generateString(int length) {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < length; i++) {
Expand Down
Expand Up @@ -234,7 +234,7 @@ object DeleteExecution {
TupleIdEnum.BLOCKLET_ID.getTupleIdIndex),
Integer.parseInt(CarbonUpdateUtil.getRequiredFieldFromTID(TID,
TupleIdEnum.PAGE_ID.getTupleIdIndex)))
} else if (TID.contains("#")) {
} else if (TID.contains("#/") && load.getPath != null) {
// this is in case of the external segment, where the tuple id has external path with#
(CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.EXTERNAL_OFFSET),
CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.EXTERNAL_BLOCKLET_ID),
Expand Down Expand Up @@ -275,7 +275,7 @@ object DeleteExecution {
columnCompressor = CompressorFactory.getInstance.getCompressor.getName
}
var blockNameFromTupleID =
if (TID.contains("#")) {
if (TID.contains("#/") && load.getPath != null) {
CarbonUpdateUtil.getRequiredFieldFromTID(TID,
TupleIdEnum.EXTERNAL_BLOCK_ID)
} else {
Expand Down
Expand Up @@ -18,17 +18,15 @@ package org.apache.carbondata.spark.testsuite.iud

import java.io.File

import org.apache.spark.sql.test.SparkTestQueryExecutor
import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SaveMode}
import org.scalatest.{BeforeAndAfterAll, ConfigMap}

import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SaveMode}
import org.scalatest.BeforeAndAfterAll

import org.apache.carbondata.common.constants.LoggerAction
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath

class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
Expand Down Expand Up @@ -76,12 +74,23 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql("""create table iud.zerorows (c1 string,c2 int,c3 string,c5 string) STORED AS carbondata""")
sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.zerorows""")
sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.zerorows""")
sql("insert into iud.zerorows select 'abc',34,'def','des'")
sql("""update zerorows d set (d.c2) = (d.c2 + 1) where d.c1 = 'a'""").show()
sql("""update zerorows d set (d.c2) = (d.c2 + 1) where d.c1 = 'b'""").show()
sql("clean files for table iud.zerorows")
checkAnswer(
sql("""select c1,c2,c3,c5 from iud.zerorows"""),
Seq(Row("a",2,"aa","aaa"),Row("b",3,"bb","bbb"),Row("c",3,"cc","ccc"),Row("d",4,"dd","ddd"),Row("e",5,"ee","eee"),Row("a",2,"aa","aaa"),Row("b",3,"bb","bbb"),Row("c",3,"cc","ccc"),Row("d",4,"dd","ddd"),Row("e",5,"ee","eee"))
Seq(Row("a", 2, "aa", "aaa"),
Row("abc", 34, "def", "des"),
Row("b", 3, "bb", "bbb"),
Row("c", 3, "cc", "ccc"),
Row("d", 4, "dd", "ddd"),
Row("e", 5, "ee", "eee"),
Row("a", 2, "aa", "aaa"),
Row("b", 3, "bb", "bbb"),
Row("c", 3, "cc", "ccc"),
Row("d", 4, "dd", "ddd"),
Row("e", 5, "ee", "eee"))
)
sql("""drop table iud.zerorows""")
}
Expand Down Expand Up @@ -906,6 +915,25 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists test_return_row_count_source")
}

test("test update on a table with multiple partition directories") {
sql("drop table if exists partitionMultiple")
import sqlContext.implicits._
val df = sqlContext.sparkContext.parallelize(1 to 4, 4)
.map { x => (s"name$x", s"$x", s"region$x", s"country$x", s"city$x")
}.toDF("name", "age", "region", "country", "city")
df.write.format("carbondata")
.option("tableName", "partitionMultiple")
.option("partitionColumns", "region, country, city")
.mode(SaveMode.Overwrite)
.save()
checkAnswer(sql("delete from partitionMultiple where name = 'name2'"), Seq(Row(1)))
checkAnswer(sql("update partitionMultiple set(name) = ('Joey') where age = 3"), Seq(Row(1)))
checkAnswer(sql("select * from partitionMultiple"),
Seq(Row("name1", "1", "region1", "country1", "city1"),
Row("name4", "4", "region4", "country4", "city4"),
Row("Joey", "3", "region3", "country3", "city3")))
}

test("test update for partition table without merge index files for segment") {
try {
sql("DROP TABLE IF EXISTS iud.partition_nomerge_index")
Expand Down

0 comments on commit b9a1398

Please sign in to comment.