Skip to content
Permalink
Browse files
[CARBONDATA-4316]Fix horizontal compaction failure for partition tables
Why is this PR needed?
Horizontal compaction fails for partition table leading to many delete
delta files for a single block, leading to slower query performance.
This is happening because during horizontal compaction the delta file
path prepared for the partition table is wrong which fails to identify
the path and fails the operation.

What changes were proposed in this PR?
If it is a partition table, read the segment file and identity the
partition where the block is present to prepare a proper partition path.

This closes #4240
  • Loading branch information
akashrn5 authored and kunal642 committed Dec 22, 2021
1 parent f266a73 commit d629dc0b894a64bfbef762736775a182e40827fe
Showing 5 changed files with 69 additions and 11 deletions.
@@ -19,6 +19,7 @@

import java.io.*;
import java.util.*;
import java.util.stream.Collectors;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -33,6 +34,7 @@
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.locks.LockUsage;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
@@ -364,7 +366,8 @@ public boolean accept(CarbonFile pathName) {
* @param blockName the specified block of the segment
* @return delete delta file list of the block
*/
public List<String> getDeleteDeltaFilesList(final Segment segment, final String blockName) {
public List<String> getDeleteDeltaFilesList(final Segment segment, final String blockName)
throws IOException {
List<String> deleteDeltaFileList = new ArrayList<>();
String segmentPath = null;
if (segment.isExternalSegment()) {
@@ -374,6 +377,25 @@ public List<String> getDeleteDeltaFilesList(final Segment segment, final String
break;
}
}
} else if (isPartitionTable) {
String segmentFileName = Arrays.stream(segmentDetails).filter(
loadMetaDataDetail -> loadMetaDataDetail.getLoadName()
.equalsIgnoreCase(segment.getSegmentNo())).collect(Collectors.toList()).get(0)
.getSegmentFile();
SegmentFileStore segmentFileStore =
new SegmentFileStore(identifier.getTablePath(), segmentFileName);
segmentFileStore.readIndexFiles(SegmentStatus.SUCCESS, false, FileFactory.getConfiguration());
for (Map.Entry<String, List<String>> entry : segmentFileStore.getIndexFilesMap().entrySet()) {
List<String> matchedBlocksInPartition = entry.getValue().stream().filter(blockFile -> {
String blockFileName = blockFile.substring(blockFile.lastIndexOf(File.separator) + 1);
return blockName.equalsIgnoreCase(CarbonUpdateUtil.getBlockName(blockFileName));
}).collect(Collectors.toList());
if (matchedBlocksInPartition.size() > 0) {
segmentPath = matchedBlocksInPartition.get(0)
.substring(0, matchedBlocksInPartition.get(0).lastIndexOf(File.separator));
break;
}
}
} else {
segmentPath = CarbonTablePath.getSegmentPath(
identifier.getTablePath(), segment.getSegmentNo());
@@ -94,6 +94,7 @@ private[sql] case class CarbonProjectForDeleteCommand(
LockUsage.UPDATE_LOCK)
var lockStatus = false
var hasException = false
var deletedRows = 0L;
try {
lockStatus = metadataLock.lockWithRetries()
if (lockStatus) {
@@ -118,6 +119,8 @@ private[sql] case class CarbonProjectForDeleteCommand(
isUpdateOperation = false,
executorErrors)

deletedRows = deletedRowCount;

// Check for any failures occurred during delete delta execution
if (executorErrors.failureCauses != FailureCauses.NONE) {
throw new Exception(executorErrors.errorMsg)
@@ -141,14 +144,16 @@ private[sql] case class CarbonProjectForDeleteCommand(
val deleteFromTablePostEvent: DeleteFromTablePostEvent =
DeleteFromTablePostEvent(sparkSession, carbonTable)
OperationListenerBus.getInstance.fireEvent(deleteFromTablePostEvent, operationContext)
Seq(Row(deletedRowCount))
Seq(Row(deletedRows))
} catch {
case e: HorizontalCompactionException =>
LOGGER.error("Delete operation passed. Exception in Horizontal Compaction." +
" Please check logs. " + e.getMessage)
CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
hasException = true
Seq(Row(0L))
// if just the horizontal compaction fails, return the deleted count as it will be
// successful.
Seq(Row(deletedRows))

case e: Exception =>
LOGGER.error("Exception in Delete data operation " + e.getMessage, e)
@@ -112,9 +112,7 @@ object HorizontalCompaction {
val db = carbonTable.getDatabaseName
val table = carbonTable.getTableName
val deletedBlocksList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
absTableIdentifier,
segmentUpdateStatusManager,
compactionTypeIUD)
segmentUpdateStatusManager)
if (LOG.isDebugEnabled) {
LOG.debug(s"The segment list for Horizontal Update Compaction is $deletedBlocksList")
}
@@ -468,6 +468,40 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql(s"drop table if exists ${ tableName }").collect()
}

test("test partition table delete and horizontal compaction") {
sql("drop table if exists iud_db.partition_hc")
sql(
"create table iud_db.partition_hc (c1 string,c2 int,c5 string) PARTITIONED BY(c3 string) " +
"STORED AS carbondata")
sql(
"insert into iud_db.partition_hc values ('a',1,'aaa','aa'),('a',5,'aaa','aa'),('a',9,'aaa'," +
"'aa'),('a',4,'aaa','aa'),('a',2,'aaa','aa'),('a',3,'aaa'," +
"'aa')")
sql("delete from iud_db.partition_hc where c2 = 1").show()
sql("delete from iud_db.partition_hc where c2 = 5").show()
checkAnswer(
sql("""select c2 from iud_db.partition_hc"""),
Seq(Row(9), Row(4), Row(2), Row(3))
)
// verify if the horizontal compaction happened or not
val carbonTable = CarbonEnv.getCarbonTable(Some("iud_db"), "partition_hc")(sqlContext
.sparkSession)
val partitionPath = carbonTable.getTablePath + "/c3=aa"
val deltaFiles = FileFactory.getCarbonFile(partitionPath).listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = {
file.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
}
})
assert(deltaFiles.size == 3)
val updateStatusFiles = FileFactory.getCarbonFile(CarbonTablePath.getMetadataPath(carbonTable
.getTablePath)).listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = {
file.getName.startsWith(CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME)
}
})
assert(updateStatusFiles.size == 3)
}

override def afterAll {
sql("use default")
sql("drop database if exists iud_db cascade")
@@ -875,13 +875,11 @@ private static boolean isSegmentValid(LoadMetadataDetails seg) {
/**
* method gets the segments list which get qualified for IUD compaction.
* @param segments
* @param absoluteTableIdentifier
* @param compactionTypeIUD
* @return
*/
public static List<String> getSegListIUDCompactionQualified(List<Segment> segments,
AbsoluteTableIdentifier absoluteTableIdentifier,
SegmentUpdateStatusManager segmentUpdateStatusManager, CompactionType compactionTypeIUD) {
SegmentUpdateStatusManager segmentUpdateStatusManager)
throws IOException {

List<String> validSegments = new ArrayList<>();

@@ -906,7 +904,8 @@ public static List<String> getSegListIUDCompactionQualified(List<Segment> segmen
* @return block list of the segment
*/
private static List<String> checkDeleteDeltaFilesInSeg(Segment seg,
SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold)
throws IOException {

List<String> blockLists = new ArrayList<>();
Set<String> uniqueBlocks = new HashSet<String>();

0 comments on commit d629dc0

Please sign in to comment.