Skip to content
Permalink
Browse files
[CARBONDATA-4318]Improve load overwrite performance for partition tables
Why is this PR needed?
With the increase in the number of overwrite loads for the partition table,
the time takes for each load keeps on increasing over time. This is because,

1. whenever a load overwrite for partition table is fired, it basically means
that we need to overwrite or drop the partitions if anything overlaps with
current partitions getting loaded. Since carbondata stores the partition
information in the segments file, to identify and drop partitions, it's
reading all the previous segment files to identify and drop the overwriting
partitions, which leads to a decrease in performance.

2. After partition load is completed, a cleanSegments method is called which
again reads segment file and table status file to identify MArked for delete
segments to clean. But Since the force clean is false and timeout also is
more than a day by default, it's not necessary to call this method.
Clean files should handle this part.

What changes were proposed in this PR?
1. we already have the information about current partitions, so with that first
identify if there are any partitions to overwrite, if yes then only we read segment
files to call dropParitition, else we don't read the segment files unnecessarily.
It also contains other refactoring to avoid reading table status file also.
2. no need to call clean segments after every load. Clean files will take care
to delete the expired ones.

This closes #4242
  • Loading branch information
akashrn5 authored and kunal642 committed Dec 29, 2021
1 parent 970f11d commit 308906e459607383eedf20c18236c7d17509959e
Showing 6 changed files with 76 additions and 62 deletions.
@@ -1005,10 +1005,9 @@ public List<CarbonFile> getIndexCarbonFiles() {
* @param uniqueId
* @throws IOException
*/
public void dropPartitions(Segment segment, List<PartitionSpec> partitionSpecs,
public void dropPartitions(String segmentNo, List<String> partitionLocations,
String uniqueId, List<String> toBeDeletedSegments, List<String> toBeUpdatedSegments)
throws IOException {
readSegment(tablePath, segment.getSegmentFileName());
boolean updateSegment = false;
for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
String location = entry.getKey();
@@ -1017,9 +1016,9 @@ public void dropPartitions(Segment segment, List<PartitionSpec> partitionSpecs,
}
Path path = new Path(location);
// Update the status to delete if path equals
if (null != partitionSpecs) {
for (PartitionSpec spec : partitionSpecs) {
if (path.equals(spec.getLocation())) {
if (null != partitionLocations) {
for (String partitionLocation : partitionLocations) {
if (path.toString().equals(partitionLocation)) {
entry.getValue().setStatus(SegmentStatus.MARKED_FOR_DELETE.getMessage());
updateSegment = true;
break;
@@ -1031,7 +1030,7 @@ public void dropPartitions(Segment segment, List<PartitionSpec> partitionSpecs,
String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath);
writePath =
writePath + CarbonCommonConstants.FILE_SEPARATOR +
SegmentFileStore.genSegmentFileName(segment.getSegmentNo(), String.valueOf(uniqueId))
SegmentFileStore.genSegmentFileName(segmentNo, String.valueOf(uniqueId))
+ CarbonTablePath.SEGMENT_EXT;
writeSegmentFile(segmentFile, writePath);
}
@@ -1044,10 +1043,10 @@ public void dropPartitions(Segment segment, List<PartitionSpec> partitionSpecs,
}
}
if (deleteSegment) {
toBeDeletedSegments.add(segment.getSegmentNo());
toBeDeletedSegments.add(segmentNo);
}
if (updateSegment) {
toBeUpdatedSegments.add(segment.getSegmentNo());
toBeUpdatedSegments.add(segmentNo);
}
}

@@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -38,7 +39,6 @@
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonSessionInfo;
import org.apache.carbondata.core.util.ObjectSerializationUtil;
@@ -119,9 +119,12 @@ public void commitJob(JobContext context) throws IOException {

boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration());
CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
List<PartitionSpec> currentPartitionsOfTable = (List<PartitionSpec>) ObjectSerializationUtil
.convertStringToObject(context.getConfiguration().get("carbon.currentpartition"));
if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isHivePartitionTable()) {
try {
commitJobForPartition(context, overwriteSet, loadModel, partitionPath);
commitJobForPartition(context, overwriteSet, loadModel, partitionPath,
currentPartitionsOfTable);
} catch (Exception e) {
CarbonLoaderUtil.updateTableStatusForFailure(loadModel);
LOGGER.error("commit job failed", e);
@@ -191,7 +194,10 @@ public void commitJob(JobContext context) throws IOException {
if (segmentSize == 0) {
newMetaEntry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
}
uniqueId = overwritePartitions(loadModel, newMetaEntry, uuid);
List<String> partitionList = (List<String>) ObjectSerializationUtil
.convertStringToObject(partitionPath);
uniqueId = overwritePartitions(loadModel, newMetaEntry, uuid, partitionList,
currentPartitionsOfTable);
}
} else {
CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false, uuid, false);
@@ -239,7 +245,8 @@ private void commitJobFinal(JobContext context, CarbonLoadModel loadModel,
* re-factory commitJob flow for partition table
*/
private void commitJobForPartition(JobContext context, boolean overwriteSet,
CarbonLoadModel loadModel, String partitionPath) throws IOException {
CarbonLoadModel loadModel, String partitionPath, List<PartitionSpec> currentPartitionsOfTable)
throws IOException {
String size = context.getConfiguration().get("carbon.datasize", "");
String indexSize = context.getConfiguration().get("carbon.indexsize", "");
if (size.equalsIgnoreCase("0") || indexSize.equalsIgnoreCase("0")) {
@@ -300,7 +307,10 @@ private void commitJobForPartition(JobContext context, boolean overwriteSet,
}
String uniqueId = null;
if (overwriteSet) {
uniqueId = overwritePartitions(loadModel, newMetaEntry, uuid);
List<String> partitionList =
(List<String>) ObjectSerializationUtil.convertStringToObject(partitionPath);
uniqueId = overwritePartitions(loadModel, newMetaEntry, uuid, partitionList,
currentPartitionsOfTable);
} else {
CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false, uuid, false);
}
@@ -316,31 +326,38 @@ private void commitJobForPartition(JobContext context, boolean overwriteSet,
* of all segment files.
*/
private String overwritePartitions(CarbonLoadModel loadModel, LoadMetadataDetails newMetaEntry,
String uuid) throws IOException {
String uuid, List<String> partitionList, List<PartitionSpec> currentPartitionsOfTable)
throws IOException {
CarbonTable table = loadModel.getCarbonDataLoadSchema().getCarbonTable();
SegmentFileStore fileStore = new SegmentFileStore(loadModel.getTablePath(),
loadModel.getSegmentId() + "_" + loadModel.getFactTimeStamp()
+ CarbonTablePath.SEGMENT_EXT);
List<PartitionSpec> partitionSpecs = fileStore.getPartitionSpecs();

if (partitionSpecs != null && partitionSpecs.size() > 0) {
List<Segment> validSegments =
new SegmentStatusManager(table.getAbsoluteTableIdentifier())
.getValidAndInvalidSegments(table.isMV()).getValidSegments();
String uniqueId = String.valueOf(System.currentTimeMillis());
List<String> toBeUpdatedSegments = new ArrayList<>();
List<String> toBeDeletedSegments = new ArrayList<>();
// First drop the partitions from partition mapper files of each segment
for (Segment segment : validSegments) {
new SegmentFileStore(table.getTablePath(), segment.getSegmentFileName()).dropPartitions(
segment, partitionSpecs, uniqueId, toBeDeletedSegments, toBeUpdatedSegments);
if (partitionList != null && partitionList.size() > 0) {
// check if any partitions overlaps
List<String> overlappingPartitions = currentPartitionsOfTable.stream()
.map(partitionSpec -> partitionSpec.getLocation().toString())
.filter(partitionList::contains).collect(Collectors.toList());
if (!overlappingPartitions.isEmpty()) {
List<LoadMetadataDetails> validLoadMetadataDetails =
loadModel.getLoadMetadataDetails().stream().filter(
loadMetadataDetail -> !loadMetadataDetail.getLoadName()
.equalsIgnoreCase(newMetaEntry.getLoadName())).collect(Collectors.toList());
String uniqueId = String.valueOf(System.currentTimeMillis());
List<String> toBeUpdatedSegments = new ArrayList<>(validLoadMetadataDetails.size());
List<String> toBeDeletedSegments = new ArrayList<>(validLoadMetadataDetails.size());
// First drop the partitions from partition mapper files of each segment
for (LoadMetadataDetails loadMetadataDetail : validLoadMetadataDetails) {
new SegmentFileStore(table.getTablePath(), loadMetadataDetail.getSegmentFile())
.dropPartitions(loadMetadataDetail.getLoadName(), partitionList, uniqueId,
toBeDeletedSegments, toBeUpdatedSegments);
}
newMetaEntry.setUpdateStatusFileName(uniqueId);
// Commit the removed partitions in carbon store.
CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false, uuid,
Segment.toSegmentList(toBeDeletedSegments, null),
Segment.toSegmentList(toBeUpdatedSegments, null), false);
return uniqueId;
} else {
CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false, uuid, false);
return null;
}
newMetaEntry.setUpdateStatusFileName(uniqueId);
// Commit the removed partitions in carbon store.
CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false, uuid,
Segment.toSegmentList(toBeDeletedSegments, null),
Segment.toSegmentList(toBeUpdatedSegments, null), false);
return uniqueId;
}
return null;
}
@@ -46,7 +46,7 @@ class CarbonDropPartitionRDD(
@transient private val ss: SparkSession,
tablePath: String,
segments: Seq[Segment],
partitions: util.List[PartitionSpec],
partitionLocations: util.List[String],
uniqueId: String)
extends CarbonRDD[(String, String)](ss, Nil) {

@@ -67,8 +67,8 @@ class CarbonDropPartitionRDD(
new SegmentFileStore(
tablePath,
split.segment.getSegmentFileName).dropPartitions(
split.segment,
partitions,
split.segment.getSegmentNo,
partitionLocations,
uniqueId,
toBeDeletedSegments,
toBeUpdateSegments)
@@ -1078,11 +1078,6 @@ object CommonLoadUtils {
CarbonThreadUtil.threadUnset("partition.operationcontext")
if (loadParams.isOverwriteTable) {
IndexStoreManager.getInstance().clearIndex(table.getAbsoluteTableIdentifier)
// Clean the overwriting segments if any.
SegmentFileStore.cleanSegments(
table,
null,
false)
}
if (partitionsLen > 1) {
// clean cache only if persisted and keeping unpersist non-blocking as non-blocking call
@@ -21,6 +21,7 @@ import java.util

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -29,6 +30,7 @@ import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
import org.apache.spark.util.AlterTableUtil

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.index.IndexStoreManager
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
@@ -61,7 +63,7 @@ case class CarbonAlterTableDropHivePartitionCommand(
operationContext: OperationContext = new OperationContext)
extends AtomicRunnableCommand {

var carbonPartitionsTobeDropped : util.List[PartitionSpec] = _
var carbonPartitionsTobeDropped : util.List[String] = _
var table: CarbonTable = _
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
lazy val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
@@ -85,15 +87,10 @@ case class CarbonAlterTableDropHivePartitionCommand(
val partitions =
specs.flatMap(f => sparkSession.sessionState.catalog.listPartitions(tableName,
Some(CarbonSparkSqlParserUtil.copyTablePartition(f))))
val carbonPartitions = partitions.map { partition =>
new PartitionSpec(
new util.ArrayList[String](
partition.spec.seq.map { case (column, value) =>
column.toLowerCase + "=" + value
}.toList.asJava),
partition.location)
val partitionLocations = partitions.map { partition =>
FileFactory.getUpdatedFilePath(new Path(partition.location).toString)
}
carbonPartitionsTobeDropped = new util.ArrayList[PartitionSpec](carbonPartitions.asJava)
carbonPartitionsTobeDropped = new util.ArrayList[String](partitionLocations.asJava)
withEvents(operationContext,
PreAlterTableHivePartitionCommandEvent(sparkSession, table),
PostAlterTableHivePartitionCommandEvent(sparkSession, table)) {
@@ -517,19 +517,25 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
Option(CarbonCommonConstants.DATABASE_DEFAULT_NAME), tableName)(sqlContext.sparkSession)
val dt1 = "dt1"
sql(s"insert overwrite table $tableName partition(dt='$dt1') select 1, 'a'")
val dt1Metas = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
assert(dt1Metas.length == 1)
val dt1Seg1 = dt1Metas(0)

val dt2 = "dt2"
sql(s"insert overwrite table $tableName partition(dt='$dt2') select 1, 'a'")
val dt2Metas = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
assert(dt2Metas.length == 2)
val dt2Seg1 = dt2Metas(0)
val dt2Seg2 = dt2Metas(1)
val dt1Metas = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)

assert(dt1Seg1.getUpdateDeltaEndTimestamp == dt2Seg1.getUpdateDeltaEndTimestamp)
assert(dt1Seg1.getUpdateDeltaEndTimestamp != dt2Seg2.getUpdateDeltaEndTimestamp)
assert(dt1Metas.length == 2)
val dt1Seg1 = dt1Metas(0)
val dt2Seg1 = dt1Metas(1)

sql(s"insert overwrite table $tableName partition(dt='$dt2') select 5, 'z'")
val dt2Metas = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
assert(dt2Metas.length == 3)
val dt2Seg30 = dt2Metas(0)
val dt2Seg31 = dt2Metas(1)
val dt2Seg2OverWrite = dt2Metas(2)

assert(dt1Seg1.getUpdateDeltaEndTimestamp == dt2Seg30.getUpdateDeltaEndTimestamp)
assert(dt2Seg1.getUpdateDeltaEndTimestamp == dt2Seg31.getUpdateDeltaEndTimestamp)
assert(dt2Seg31.getUpdateDeltaEndTimestamp != dt2Seg2OverWrite.getUpdateDeltaEndTimestamp)
sql(s"drop table if exists $tableName")
}

0 comments on commit 308906e

Please sign in to comment.