Skip to content

Commit

Permalink
Fix DataLoading to MV table when Yarn-Application is killed
Browse files Browse the repository at this point in the history
  • Loading branch information
Indhumathi27 committed Jun 28, 2019
1 parent 188e7e4 commit 1cd7722
Show file tree
Hide file tree
Showing 32 changed files with 107 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,15 @@ public boolean rebuild() throws IOException, NoSuchDataMapException {
}
String newLoadName = "";
String segmentMap = "";
AbsoluteTableIdentifier dataMapTableAbsoluteTableIdentifier = AbsoluteTableIdentifier
.from(dataMapSchema.getRelationIdentifier().getTablePath(),
CarbonTable dataMapTable = CarbonTable
.buildFromTablePath(dataMapSchema.getRelationIdentifier().getTableName(),
dataMapSchema.getRelationIdentifier().getDatabaseName(),
dataMapSchema.getRelationIdentifier().getTableName());
dataMapSchema.getRelationIdentifier().getTablePath(),
dataMapSchema.getRelationIdentifier().getTableId());
AbsoluteTableIdentifier dataMapTableAbsoluteTableIdentifier =
dataMapTable.getAbsoluteTableIdentifier();
// Clean up the old invalid segment data before creating a new entry for new load.
SegmentStatusManager.deleteLoadsAndUpdateMetadata(dataMapTable, false, null);
SegmentStatusManager segmentStatusManager =
new SegmentStatusManager(dataMapTableAbsoluteTableIdentifier);
Map<String, List<String>> segmentMapping = new HashMap<>();
Expand All @@ -148,6 +153,15 @@ public boolean rebuild() throws IOException, NoSuchDataMapException {
CarbonTablePath.getMetadataPath(dataMapSchema.getRelationIdentifier().getTablePath());
LoadMetadataDetails[] loadMetaDataDetails =
SegmentStatusManager.readLoadMetadata(dataMapTableMetadataPath);
// Mark for delete all stale loadMetadetail
for (LoadMetadataDetails loadMetadataDetail : loadMetaDataDetails) {
if ((loadMetadataDetail.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS
|| loadMetadataDetail.getSegmentStatus()
== SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) && loadMetadataDetail.getVisibility()
.equalsIgnoreCase("false")) {
loadMetadataDetail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
}
}
List<LoadMetadataDetails> listOfLoadFolderDetails =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
Collections.addAll(listOfLoadFolderDetails, loadMetaDataDetails);
Expand Down Expand Up @@ -223,7 +237,7 @@ public boolean rebuild() throws IOException, NoSuchDataMapException {
+ " during table status updation");
}
}
return rebuildInternal(newLoadName, segmentMapping);
return rebuildInternal(newLoadName, segmentMapping, dataMapTable);
}

/**
Expand Down Expand Up @@ -395,5 +409,6 @@ public DataMapCatalog createDataMapCatalog() {

public abstract boolean supportRebuild();

public abstract boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap);
public abstract boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap,
CarbonTable carbonTable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvali
CarbonTable carbonTable, Configuration configuration) throws IOException {
SegmentStatusManager ssm =
new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(), configuration);
return ssm.getValidAndInvalidSegments();
return ssm.getValidAndInvalidSegments(carbonTable.isChildTable());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.path.CarbonTablePath;

Expand All @@ -55,14 +56,16 @@ public static boolean canDataMapBeEnabled(DataMapSchema dataMapSchema) throws IO
SegmentStatusManager.readLoadMetadata(metaDataPath);
Map<String, List<String>> dataMapSegmentMap = new HashMap<>();
for (LoadMetadataDetails loadMetadataDetail : dataMapLoadMetadataDetails) {
Map<String, List<String>> segmentMap =
DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetail.getExtraInfo());
if (dataMapSegmentMap.isEmpty()) {
dataMapSegmentMap.putAll(segmentMap);
} else {
for (Map.Entry<String, List<String>> entry : segmentMap.entrySet()) {
if (null != dataMapSegmentMap.get(entry.getKey())) {
dataMapSegmentMap.get(entry.getKey()).addAll(entry.getValue());
if (loadMetadataDetail.getSegmentStatus() == SegmentStatus.SUCCESS) {
Map<String, List<String>> segmentMap =
DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetail.getExtraInfo());
if (dataMapSegmentMap.isEmpty()) {
dataMapSegmentMap.putAll(segmentMap);
} else {
for (Map.Entry<String, List<String>> entry : segmentMap.entrySet()) {
if (null != dataMapSegmentMap.get(entry.getKey())) {
dataMapSegmentMap.get(entry.getKey()).addAll(entry.getValue());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ public static void commitDropPartitions(CarbonTable carbonTable, String uniqueId
if (toBeDeleteSegments.size() > 0 || toBeUpdatedSegments.size() > 0) {
Set<Segment> segmentSet = new HashSet<>(
new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier())
.getValidAndInvalidSegments().getValidSegments());
.getValidAndInvalidSegments(carbonTable.isChildTable()).getValidSegments());
CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, uniqueId, true,
Segment.toSegmentList(toBeDeleteSegments, null),
Segment.toSegmentList(toBeUpdatedSegments, null), uuid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,18 @@ public static long getTableStatusLastModifiedTime(AbsoluteTableIdentifier identi
}

public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOException {
return getValidAndInvalidSegments(null, null);
return getValidAndInvalidSegments(false, null, null);
}

public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(Boolean isChildTable)
throws IOException {
return getValidAndInvalidSegments(isChildTable, null, null);
}

/**
* get valid segment for given load status details.
*/
public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(Boolean isChildTable,
LoadMetadataDetails[] loadMetadataDetails, ReadCommittedScope readCommittedScope)
throws IOException {

Expand Down Expand Up @@ -162,9 +167,21 @@ public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope));
continue;
}
listOfValidSegments.add(
new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope,
segment));
// In case of child table, during loading, if no record is loaded to the segment, then
// segmentStatus will be marked as 'Success'. During query, don't need to add that segment
// to validSegment list, as segment does not exists
if (isChildTable) {
if (!segment.getDataSize().equalsIgnoreCase("0") && !segment.getIndexSize()
.equalsIgnoreCase("0")) {
listOfValidSegments.add(
new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope,
segment));
}
} else {
listOfValidSegments.add(
new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope,
segment));
}
} else if ((SegmentStatus.LOAD_FAILURE == segment.getSegmentStatus()
|| SegmentStatus.COMPACTED == segment.getSegmentStatus()
|| SegmentStatus.MARKED_FOR_DELETE == segment.getSegmentStatus())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3143,7 +3143,7 @@ public static ColumnarFormatVersion getFormatVersion(CarbonTable carbonTable) th
SegmentStatusManager segmentStatusManager =
new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier());
SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo =
segmentStatusManager.getValidAndInvalidSegments();
segmentStatusManager.getValidAndInvalidSegments(carbonTable.isChildTable());
List<Segment> validSegments = validAndInvalidSegmentsInfo.getValidSegments();
if (validSegments.isEmpty()) {
return carbonProperties.getFormatVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,8 @@ public void deleteDatamapData() {
SegmentStatusManager ssm =
new SegmentStatusManager(getCarbonTable().getAbsoluteTableIdentifier());
try {
List<Segment> validSegments = ssm.getValidAndInvalidSegments().getValidSegments();
List<Segment> validSegments =
ssm.getValidAndInvalidSegments(getCarbonTable().isChildTable()).getValidSegments();
for (Segment segment : validSegments) {
deleteDatamapData(segment);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ public static boolean validateAndGetStoreBlockletWise(DataMapSchema schema) {
private void deleteDatamap() throws MalformedDataMapCommandException {
SegmentStatusManager ssm = new SegmentStatusManager(tableIdentifier);
try {
List<Segment> validSegments = ssm.getValidAndInvalidSegments().getValidSegments();
List<Segment> validSegments =
ssm.getValidAndInvalidSegments(getCarbonTable().isChildTable()).getValidSegments();
for (Segment segment : validSegments) {
deleteDatamapData(segment);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ class MVDataMapProvider(

@throws[IOException]
override def rebuildInternal(newLoadName: String,
segmentMap: java.util.Map[String, java.util.List[String]]): Boolean = {
segmentMap: java.util.Map[String, java.util.List[String]],
dataMapTable: CarbonTable): Boolean = {
val ctasQuery = dataMapSchema.getCtasQuery
if (ctasQuery != null) {
val identifier = dataMapSchema.getRelationIdentifier
Expand All @@ -129,11 +130,6 @@ class MVDataMapProvider(
if (isFullRefresh) {
isOverwriteTable = true
}
val dataMapTable = CarbonTable
.buildFromTablePath(identifier.getTableName,
identifier.getDatabaseName,
identifier.getTablePath,
identifier.getTableId)
// Set specified segments for incremental load
val segmentMapIterator = segmentMap.entrySet().iterator()
while (segmentMapIterator.hasNext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws
List<Segment> segmentDeleteList = Segment.toSegmentList(segmentsToBeDeleted.split(","), null);
Set<Segment> segmentSet = new HashSet<>(
new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(),
context.getConfiguration()).getValidAndInvalidSegments().getValidSegments());
context.getConfiguration()).getValidAndInvalidSegments(carbonTable.isChildTable())
.getValidSegments());
if (updateTime != null) {
CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, updateTime, true,
segmentDeleteList);
Expand Down Expand Up @@ -231,7 +232,7 @@ private String overwritePartitions(CarbonLoadModel loadModel, LoadMetadataDetail
if (partitionSpecs != null && partitionSpecs.size() > 0) {
List<Segment> validSegments =
new SegmentStatusManager(table.getAbsoluteTableIdentifier())
.getValidAndInvalidSegments().getValidSegments();
.getValidAndInvalidSegments(table.isChildTable()).getValidSegments();
String uniqueId = String.valueOf(System.currentTimeMillis());
List<String> tobeUpdatedSegs = new ArrayList<>();
List<String> tobeDeletedSegs = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {
SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier,
readCommittedScope.getConfiguration());
SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager
.getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope);
.getValidAndInvalidSegments(carbonTable.isChildTable(), loadMetadataDetails,
this.readCommittedScope);

// to check whether only streaming segments access is enabled or not,
// if access streaming segment is true then data will be read from streaming segments
Expand Down Expand Up @@ -523,7 +524,8 @@ public BlockMappingVO getBlockRowCount(Job job, CarbonTable table,
table, loadMetadataDetails);
SegmentStatusManager.ValidAndInvalidSegmentsInfo allSegments =
new SegmentStatusManager(identifier, readCommittedScope.getConfiguration())
.getValidAndInvalidSegments(loadMetadataDetails, readCommittedScope);
.getValidAndInvalidSegments(table.isChildTable(), loadMetadataDetails,
readCommittedScope);
Map<String, Long> blockRowCountMapping = new HashMap<>();
Map<String, Long> segmentAndBlockCountMapping = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class DataCompactionCardinalityBoundryTest extends QueryTest with BeforeAndAfter
new CarbonTableIdentifier("default", "cardinalityTest", "1")
)
)
val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
val segments = segmentStatusManager.getValidAndInvalidSegments().getValidSegments.asScala.toList

if (!segments.contains("0.1")) {
// wait for 2 seconds for compaction to complete.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class DataCompactionLockTest extends QueryTest with BeforeAndAfterAll {
val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(
absoluteTableIdentifier
)
val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
val segments = segmentStatusManager.getValidAndInvalidSegments().getValidSegments.asScala.toList

if (!segments.contains("0.1")) {
assert(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
absoluteTableIdentifier)

// merged segment should not be there
val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.map(_.getSegmentNo).toList
val segments = segmentStatusManager.getValidAndInvalidSegments().getValidSegments.asScala.map(_.getSegmentNo).toList
assert(segments.contains("0.1"))
assert(segments.contains("2.1"))
assert(!segments.contains("2"))
Expand Down Expand Up @@ -172,7 +172,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
absoluteTableIdentifier)

// merged segment should not be there
val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.map(_.getSegmentNo).toList
val segments = segmentStatusManager.getValidAndInvalidSegments().getValidSegments.asScala.map(_.getSegmentNo).toList
assert(!segments.contains("0.1"))
assert(segments.contains("0.2"))
assert(!segments.contains("2"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(
absoluteTableIdentifier)

val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
val segments = segmentStatusManager.getValidAndInvalidSegments().getValidSegments.asScala.toList
// segments.foreach(seg =>
// System.out.println( "valid segment is =" + seg)
// )
Expand Down Expand Up @@ -123,7 +123,7 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
absoluteTableIdentifier)

// merged segment should not be there
val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.map(_.getSegmentNo).toList
val segments = segmentStatusManager.getValidAndInvalidSegments().getValidSegments.asScala.map(_.getSegmentNo).toList
assert(segments.contains("0.1"))
assert(!segments.contains("0.2"))
assert(!segments.contains("0"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,7 @@ object CarbonStore {
segmentId: String): Boolean = {
val identifier = AbsoluteTableIdentifier.from(storePath, dbName, tableName, tableName)
val validAndInvalidSegments: SegmentStatusManager.ValidAndInvalidSegmentsInfo = new
SegmentStatusManager(
identifier).getValidAndInvalidSegments
SegmentStatusManager(identifier).getValidAndInvalidSegments()
validAndInvalidSegments.getValidSegments.contains(segmentId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ public boolean supportRebuild() {
return dataMapFactory.supportRebuild();
}

@Override
public boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap) {
@Override public boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap,
CarbonTable carbonTable) {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ public boolean supportRebuild() {
return false;
}

@Override
public boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap) {
@Override public boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap,
CarbonTable carbonTable) {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ object IndexDataMapRebuildRDD {
): Unit = {
val tableIdentifier = carbonTable.getAbsoluteTableIdentifier
val segmentStatusManager = new SegmentStatusManager(tableIdentifier)
val validAndInvalidSegments = segmentStatusManager.getValidAndInvalidSegments()
val validAndInvalidSegments = segmentStatusManager
.getValidAndInvalidSegments(carbonTable.isChildTable)
val validSegments = validAndInvalidSegments.getValidSegments
val indexedCarbonColumns = carbonTable.getIndexedColumns(schema)
val operationContext = new OperationContext()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ object CarbonDataRDDFactory {
val newEntryLoadStatus =
if (carbonLoadModel.isCarbonTransactionalTable &&
!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap &&
!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildTable &&
!CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) {
LOGGER.warn("Cannot write load metadata file as there is no data to load")
SegmentStatus.MARKED_FOR_DELETE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
.getTableName
}")
val validSegments: mutable.Buffer[Segment] = CarbonDataMergerUtil.getValidSegmentList(
carbonMainTable.getAbsoluteTableIdentifier).asScala
carbonMainTable.getAbsoluteTableIdentifier, carbonMainTable.isChildTable).asScala
val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]()
validSegments.foreach { segment =>
validSegmentIds += segment.getSegmentNo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object CacheUtil {
if (carbonTable.isTransactionalTable) {
val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
val validAndInvalidSegmentsInfo = new SegmentStatusManager(absoluteTableIdentifier)
.getValidAndInvalidSegments()
.getValidAndInvalidSegments(carbonTable.isChildTable)
// Fire a job to clear the invalid segments cached in the executors.
if (CarbonProperties.getInstance().isDistributedPruningEnabled(carbonTable.getDatabaseName,
carbonTable.getTableName)) {
Expand Down Expand Up @@ -106,7 +106,7 @@ object CacheUtil {

def getBloomCacheKeys(carbonTable: CarbonTable, datamap: DataMapSchema): List[String] = {
val segments = CarbonDataMergerUtil
.getValidSegmentList(carbonTable.getAbsoluteTableIdentifier).asScala
.getValidSegmentList(carbonTable.getAbsoluteTableIdentifier, carbonTable.isChildTable).asScala

// Generate shard Path for the datamap
val shardPaths = segments.flatMap {
Expand Down
Loading

0 comments on commit 1cd7722

Please sign in to comment.