Skip to content

Commit

Permalink
[CARBONDATA-3456] Fix DataLoading to MV table when Yarn-Application i…
Browse files Browse the repository at this point in the history
…s killed

Problem:
When dataLoad is triggered on datamaptable and new LoadMetaDetail with SegmentStatus as InsertInProgress and segmentMappingInfo is created and then yarn-application is killed. Then on next load, stale loadMetadetail is still in InsertInProgress state and mainTableSegemnts mapped to that loadMetaDetail is not considered for nextLoad resulted in dataMismatch between main table and datamap table

Solution:
Clean up the old invalid segment before creating a new entry for new Load.

This closes #3310
  • Loading branch information
Indhumathi27 authored and ravipesala committed Jul 12, 2019
1 parent 8f0ec97 commit cdf0594
Show file tree
Hide file tree
Showing 27 changed files with 120 additions and 57 deletions.
Expand Up @@ -129,10 +129,15 @@ public boolean rebuild() throws IOException, NoSuchDataMapException {
}
String newLoadName = "";
String segmentMap = "";
AbsoluteTableIdentifier dataMapTableAbsoluteTableIdentifier = AbsoluteTableIdentifier
.from(dataMapSchema.getRelationIdentifier().getTablePath(),
CarbonTable dataMapTable = CarbonTable
.buildFromTablePath(dataMapSchema.getRelationIdentifier().getTableName(),
dataMapSchema.getRelationIdentifier().getDatabaseName(),
dataMapSchema.getRelationIdentifier().getTableName());
dataMapSchema.getRelationIdentifier().getTablePath(),
dataMapSchema.getRelationIdentifier().getTableId());
AbsoluteTableIdentifier dataMapTableAbsoluteTableIdentifier =
dataMapTable.getAbsoluteTableIdentifier();
// Clean up the old invalid segment data before creating a new entry for new load.
SegmentStatusManager.deleteLoadsAndUpdateMetadata(dataMapTable, false, null);
SegmentStatusManager segmentStatusManager =
new SegmentStatusManager(dataMapTableAbsoluteTableIdentifier);
Map<String, List<String>> segmentMapping = new HashMap<>();
Expand All @@ -148,6 +153,15 @@ public boolean rebuild() throws IOException, NoSuchDataMapException {
CarbonTablePath.getMetadataPath(dataMapSchema.getRelationIdentifier().getTablePath());
LoadMetadataDetails[] loadMetaDataDetails =
SegmentStatusManager.readLoadMetadata(dataMapTableMetadataPath);
// Mark for delete all stale loadMetadetail
for (LoadMetadataDetails loadMetadataDetail : loadMetaDataDetails) {
if ((loadMetadataDetail.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS
|| loadMetadataDetail.getSegmentStatus()
== SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) && loadMetadataDetail.getVisibility()
.equalsIgnoreCase("false")) {
loadMetadataDetail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
}
}
List<LoadMetadataDetails> listOfLoadFolderDetails =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
Collections.addAll(listOfLoadFolderDetails, loadMetaDataDetails);
Expand Down Expand Up @@ -223,7 +237,7 @@ public boolean rebuild() throws IOException, NoSuchDataMapException {
+ " during table status updation");
}
}
return rebuildInternal(newLoadName, segmentMapping);
return rebuildInternal(newLoadName, segmentMapping, dataMapTable);
}

/**
Expand Down Expand Up @@ -395,5 +409,6 @@ public DataMapCatalog createDataMapCatalog() {

public abstract boolean supportRebuild();

public abstract boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap);
public abstract boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap,
CarbonTable carbonTable);
}
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -258,7 +259,7 @@ public static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvali
CarbonTable carbonTable, Configuration configuration) throws IOException {
SegmentStatusManager ssm =
new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(), configuration);
return ssm.getValidAndInvalidSegments();
return ssm.getValidAndInvalidSegments(carbonTable.isChildTable());
}

/**
Expand All @@ -280,4 +281,19 @@ public static List<String> getMainTableValidSegmentList(RelationIdentifier relat
return segmentList;
}

public static String getMaxSegmentID(List<String> segmentList) {
double[] segment = new double[segmentList.size()];
int i = 0;
for (String id : segmentList) {
segment[i] = Double.parseDouble(id);
i++;
}
Arrays.sort(segment);
String maxId = Double.toString(segment[segmentList.size() - 1]);
if (maxId.endsWith(".0")) {
maxId = maxId.substring(0, maxId.indexOf("."));
}
return maxId;
}

}
Expand Up @@ -30,6 +30,7 @@
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.path.CarbonTablePath;

Expand All @@ -55,14 +56,16 @@ public static boolean canDataMapBeEnabled(DataMapSchema dataMapSchema) throws IO
SegmentStatusManager.readLoadMetadata(metaDataPath);
Map<String, List<String>> dataMapSegmentMap = new HashMap<>();
for (LoadMetadataDetails loadMetadataDetail : dataMapLoadMetadataDetails) {
Map<String, List<String>> segmentMap =
DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetail.getExtraInfo());
if (dataMapSegmentMap.isEmpty()) {
dataMapSegmentMap.putAll(segmentMap);
} else {
for (Map.Entry<String, List<String>> entry : segmentMap.entrySet()) {
if (null != dataMapSegmentMap.get(entry.getKey())) {
dataMapSegmentMap.get(entry.getKey()).addAll(entry.getValue());
if (loadMetadataDetail.getSegmentStatus() == SegmentStatus.SUCCESS) {
Map<String, List<String>> segmentMap =
DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetail.getExtraInfo());
if (dataMapSegmentMap.isEmpty()) {
dataMapSegmentMap.putAll(segmentMap);
} else {
for (Map.Entry<String, List<String>> entry : segmentMap.entrySet()) {
if (null != dataMapSegmentMap.get(entry.getKey())) {
dataMapSegmentMap.get(entry.getKey()).addAll(entry.getValue());
}
}
}
}
Expand Down
Expand Up @@ -747,7 +747,7 @@ public static void commitDropPartitions(CarbonTable carbonTable, String uniqueId
if (toBeDeleteSegments.size() > 0 || toBeUpdatedSegments.size() > 0) {
Set<Segment> segmentSet = new HashSet<>(
new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier())
.getValidAndInvalidSegments().getValidSegments());
.getValidAndInvalidSegments(carbonTable.isChildTable()).getValidSegments());
CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, uniqueId, true,
Segment.toSegmentList(toBeDeleteSegments, null),
Segment.toSegmentList(toBeUpdatedSegments, null), uuid);
Expand Down
Expand Up @@ -103,13 +103,18 @@ public static long getTableStatusLastModifiedTime(AbsoluteTableIdentifier identi
}

public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOException {
return getValidAndInvalidSegments(null, null);
return getValidAndInvalidSegments(false, null, null);
}

public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(Boolean isChildTable)
throws IOException {
return getValidAndInvalidSegments(isChildTable, null, null);
}

/**
* get valid segment for given load status details.
*/
public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(Boolean isChildTable,
LoadMetadataDetails[] loadMetadataDetails, ReadCommittedScope readCommittedScope)
throws IOException {

Expand Down Expand Up @@ -162,9 +167,21 @@ public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope));
continue;
}
listOfValidSegments.add(
new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope,
segment));
// In case of child table, during loading, if no record is loaded to the segment, then
// segmentStatus will be marked as 'Success'. During query, don't need to add that segment
// to validSegment list, as segment does not exists
if (isChildTable) {
if (!segment.getDataSize().equalsIgnoreCase("0") && !segment.getIndexSize()
.equalsIgnoreCase("0")) {
listOfValidSegments.add(
new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope,
segment));
}
} else {
listOfValidSegments.add(
new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope,
segment));
}
} else if ((SegmentStatus.LOAD_FAILURE == segment.getSegmentStatus()
|| SegmentStatus.COMPACTED == segment.getSegmentStatus()
|| SegmentStatus.MARKED_FOR_DELETE == segment.getSegmentStatus())) {
Expand Down
Expand Up @@ -3143,7 +3143,7 @@ public static ColumnarFormatVersion getFormatVersion(CarbonTable carbonTable) th
SegmentStatusManager segmentStatusManager =
new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier());
SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo =
segmentStatusManager.getValidAndInvalidSegments();
segmentStatusManager.getValidAndInvalidSegments(carbonTable.isChildTable());
List<Segment> validSegments = validAndInvalidSegmentsInfo.getValidSegments();
if (validSegments.isEmpty()) {
return carbonProperties.getFormatVersion();
Expand Down
Expand Up @@ -374,7 +374,8 @@ public void deleteDatamapData() {
SegmentStatusManager ssm =
new SegmentStatusManager(getCarbonTable().getAbsoluteTableIdentifier());
try {
List<Segment> validSegments = ssm.getValidAndInvalidSegments().getValidSegments();
List<Segment> validSegments =
ssm.getValidAndInvalidSegments(getCarbonTable().isChildTable()).getValidSegments();
for (Segment segment : validSegments) {
deleteDatamapData(segment);
}
Expand Down
Expand Up @@ -179,7 +179,8 @@ public static boolean validateAndGetStoreBlockletWise(DataMapSchema schema) {
private void deleteDatamap() throws MalformedDataMapCommandException {
SegmentStatusManager ssm = new SegmentStatusManager(tableIdentifier);
try {
List<Segment> validSegments = ssm.getValidAndInvalidSegments().getValidSegments();
List<Segment> validSegments =
ssm.getValidAndInvalidSegments(getCarbonTable().isChildTable()).getValidSegments();
for (Segment segment : validSegments) {
deleteDatamapData(segment);
}
Expand Down
Expand Up @@ -103,7 +103,8 @@ class MVDataMapProvider(

@throws[IOException]
override def rebuildInternal(newLoadName: String,
segmentMap: java.util.Map[String, java.util.List[String]]): Boolean = {
segmentMap: java.util.Map[String, java.util.List[String]],
dataMapTable: CarbonTable): Boolean = {
val ctasQuery = dataMapSchema.getCtasQuery
if (ctasQuery != null) {
val identifier = dataMapSchema.getRelationIdentifier
Expand All @@ -129,11 +130,6 @@ class MVDataMapProvider(
if (isFullRefresh) {
isOverwriteTable = true
}
val dataMapTable = CarbonTable
.buildFromTablePath(identifier.getTableName,
identifier.getDatabaseName,
identifier.getTablePath,
identifier.getTableId)
// Set specified segments for incremental load
val segmentMapIterator = segmentMap.entrySet().iterator()
while (segmentMapIterator.hasNext) {
Expand Down
Expand Up @@ -580,15 +580,17 @@ class MVIncrementalLoadingTestcase extends QueryTest with BeforeAndAfterAll {
loadDataToFactTable("test_table")
sql("drop datamap if exists datamap1")
sql("create datamap datamap_com using 'mv' as select empname, designation from test_table")
for (i <- 0 to 4) {
for (i <- 0 to 16) {
loadDataToFactTable("test_table")
}
createTableFactTable("test_table1")
for (i <- 0 to 5) {
for (i <- 0 to 17) {
loadDataToFactTable("test_table1")
}
checkAnswer(sql("select empname, designation from test_table"),
sql("select empname, designation from test_table1"))
val result = sql("show datamap on table test_table").collectAsList()
assert(result.get(0).get(5).toString.contains("\"default.test_table\":\"12.1\""))
val df = sql(s""" select empname, designation from test_table""".stripMargin)
val analyzed = df.queryExecution.analyzed
assert(TestUtil.verifyMVDataMap(analyzed, "datamap_com"))
Expand Down
Expand Up @@ -196,7 +196,8 @@ public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws
List<Segment> segmentDeleteList = Segment.toSegmentList(segmentsToBeDeleted.split(","), null);
Set<Segment> segmentSet = new HashSet<>(
new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(),
context.getConfiguration()).getValidAndInvalidSegments().getValidSegments());
context.getConfiguration()).getValidAndInvalidSegments(carbonTable.isChildTable())
.getValidSegments());
if (updateTime != null) {
CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, updateTime, true,
segmentDeleteList);
Expand Down Expand Up @@ -231,7 +232,7 @@ private String overwritePartitions(CarbonLoadModel loadModel, LoadMetadataDetail
if (partitionSpecs != null && partitionSpecs.size() > 0) {
List<Segment> validSegments =
new SegmentStatusManager(table.getAbsoluteTableIdentifier())
.getValidAndInvalidSegments().getValidSegments();
.getValidAndInvalidSegments(table.isChildTable()).getValidSegments();
String uniqueId = String.valueOf(System.currentTimeMillis());
List<String> tobeUpdatedSegs = new ArrayList<>();
List<String> tobeDeletedSegs = new ArrayList<>();
Expand Down
Expand Up @@ -122,7 +122,8 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {
SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier,
readCommittedScope.getConfiguration());
SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager
.getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope);
.getValidAndInvalidSegments(carbonTable.isChildTable(), loadMetadataDetails,
this.readCommittedScope);

// to check whether only streaming segments access is enabled or not,
// if access streaming segment is true then data will be read from streaming segments
Expand Down Expand Up @@ -523,7 +524,8 @@ public BlockMappingVO getBlockRowCount(Job job, CarbonTable table,
table, loadMetadataDetails);
SegmentStatusManager.ValidAndInvalidSegmentsInfo allSegments =
new SegmentStatusManager(identifier, readCommittedScope.getConfiguration())
.getValidAndInvalidSegments(loadMetadataDetails, readCommittedScope);
.getValidAndInvalidSegments(table.isChildTable(), loadMetadataDetails,
readCommittedScope);
Map<String, Long> blockRowCountMapping = new HashMap<>();
Map<String, Long> segmentAndBlockCountMapping = new HashMap<>();

Expand Down
Expand Up @@ -135,8 +135,8 @@ public boolean supportRebuild() {
return dataMapFactory.supportRebuild();
}

@Override
public boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap) {
@Override public boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap,
CarbonTable carbonTable) {
return false;
}
}
Expand Up @@ -112,8 +112,8 @@ public boolean supportRebuild() {
return false;
}

@Override
public boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap) {
@Override public boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap,
CarbonTable carbonTable) {
return false;
}
}
Expand Up @@ -79,7 +79,8 @@ object IndexDataMapRebuildRDD {
): Unit = {
val tableIdentifier = carbonTable.getAbsoluteTableIdentifier
val segmentStatusManager = new SegmentStatusManager(tableIdentifier)
val validAndInvalidSegments = segmentStatusManager.getValidAndInvalidSegments()
val validAndInvalidSegments = segmentStatusManager
.getValidAndInvalidSegments(carbonTable.isChildTable)
val validSegments = validAndInvalidSegments.getValidSegments
val indexedCarbonColumns = carbonTable.getIndexedColumns(schema)
val operationContext = new OperationContext()
Expand Down
Expand Up @@ -518,6 +518,7 @@ object CarbonDataRDDFactory {
val newEntryLoadStatus =
if (carbonLoadModel.isCarbonTransactionalTable &&
!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap &&
!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildTable &&
!CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) {
LOGGER.warn("Cannot write load metadata file as there is no data to load")
SegmentStatus.MARKED_FOR_DELETE
Expand Down
Expand Up @@ -85,7 +85,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
.getTableName
}")
val validSegments: mutable.Buffer[Segment] = CarbonDataMergerUtil.getValidSegmentList(
carbonMainTable.getAbsoluteTableIdentifier).asScala
carbonMainTable.getAbsoluteTableIdentifier, carbonMainTable.isChildTable).asScala
val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]()
validSegments.foreach { segment =>
validSegmentIds += segment.getSegmentNo
Expand Down
Expand Up @@ -51,7 +51,7 @@ object CacheUtil {
if (carbonTable.isTransactionalTable) {
val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
val validAndInvalidSegmentsInfo = new SegmentStatusManager(absoluteTableIdentifier)
.getValidAndInvalidSegments()
.getValidAndInvalidSegments(carbonTable.isChildTable)
// Fire a job to clear the invalid segments cached in the executors.
if (CarbonProperties.getInstance().isDistributedPruningEnabled(carbonTable.getDatabaseName,
carbonTable.getTableName)) {
Expand Down Expand Up @@ -111,7 +111,7 @@ object CacheUtil {

def getBloomCacheKeys(carbonTable: CarbonTable, datamap: DataMapSchema): List[String] = {
val segments = CarbonDataMergerUtil
.getValidSegmentList(carbonTable.getAbsoluteTableIdentifier).asScala
.getValidSegmentList(carbonTable.getAbsoluteTableIdentifier, carbonTable.isChildTable).asScala

// Generate shard Path for the datamap
val shardPaths = segments.flatMap {
Expand Down
Expand Up @@ -109,7 +109,8 @@ object DropCacheBloomEventListener extends OperationEventListener {
val datamaps = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
.asScala.toList
val segments = CarbonDataMergerUtil
.getValidSegmentList(carbonTable.getAbsoluteTableIdentifier).asScala.toList
.getValidSegmentList(carbonTable.getAbsoluteTableIdentifier, carbonTable.isChildTable)
.asScala.toList

datamaps.foreach {
case datamap if datamap.getProviderName
Expand Down
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.command.{Checker, DataCommand}
import org.apache.spark.sql.types.StringType

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datamap.{DataMapStoreManager, DataMapUtil}
import org.apache.carbondata.core.datamap.status.{DataMapSegmentStatusUtil, DataMapStatus, DataMapStatusManager}
import org.apache.carbondata.core.metadata.schema.datamap.{DataMapClassProvider, DataMapProperty}
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
Expand Down Expand Up @@ -125,7 +125,8 @@ case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier])
val iterator = segmentMaps.entrySet().iterator()
while (iterator.hasNext) {
val entry = iterator.next()
syncInfoMap.put(entry.getKey, entry.getValue.get(entry.getValue.size() - 1))

syncInfoMap.put(entry.getKey, DataMapUtil.getMaxSegmentID(entry.getValue))
}
val loadEndTime =
if (loadMetadataDetails(i).getLoadEndTime ==
Expand Down

0 comments on commit cdf0594

Please sign in to comment.