Skip to content

Commit

Permalink
[CARBONDATA-3393] Merge Index Job Failure should not fail the LOAD. L…
Browse files Browse the repository at this point in the history
…oad status should not consider the merge index job status.
  • Loading branch information
dhatchayani committed May 20, 2019
1 parent 24fe230 commit 10fb74c
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 13 deletions.
Expand Up @@ -139,12 +139,30 @@ public static String genSegmentFileName(String segmentId, String UUID) {
*/
public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID)
throws IOException {
return writeSegmentFile(carbonTable, segmentId, UUID, null);
}

/**
* Write segment file to the metadata folder of the table selecting only the current load files
*
* @param carbonTable CarbonTable
* @param segmentId segment id
* @param UUID a UUID string used to construct the segment file name
* @return segment file name
*/
public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID,
final String currentLoadTimeStamp) throws IOException {
String tablePath = carbonTable.getTablePath();
boolean supportFlatFolder = carbonTable.isSupportFlatFolder();
String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
CarbonFile segmentFolder = FileFactory.getCarbonFile(segmentPath);
CarbonFile[] indexFiles = segmentFolder.listFiles(new CarbonFileFilter() {
@Override public boolean accept(CarbonFile file) {
if (null != currentLoadTimeStamp) {
return (file.getName().contains(currentLoadTimeStamp)) && (
file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT));
}
return (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT));
}
Expand Down
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.rdd

import org.apache.log4j.Logger
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.sql.SparkSession

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties
Expand All @@ -38,6 +40,8 @@ case class CarbonMergeFilePartition(rddId: Int, idx: Int, segmentId: String)

object CarbonMergeFilesRDD {

private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)

/**
* Merge the carbonindex files with in the segment to carbonindexmerge file inside same segment
*
Expand Down Expand Up @@ -70,9 +74,7 @@ object CarbonMergeFilesRDD {
readFileFooterFromCarbonDataFile).collect()
} else {
try {
if (CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
if (isMergeIndexPropertySet) {
new CarbonMergeFilesRDD(
sparkSession,
carbonTable,
Expand All @@ -82,19 +84,34 @@ object CarbonMergeFilesRDD {
readFileFooterFromCarbonDataFile).collect()
}
} catch {
case _: Exception =>
if (CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) {
new CarbonMergeFilesRDD(
sparkSession,
carbonTable,
segmentIds,
segmentFileNameToSegmentIdMap,
carbonTable.isHivePartitionTable,
readFileFooterFromCarbonDataFile).collect()
}
case ex: Exception =>
val message = "Merge Index files request is failed " +
s"for table ${ carbonTable.getTableUniqueName }" + ex.getMessage
LOGGER.error(message)
ex.printStackTrace()
throw new RuntimeException(message)
}
}
}

/**
* Check whether the Merge Index Property is set by the user.
* If not set, take the default value of the property.
*
* @return
*/
def isMergeIndexPropertySet: Boolean = {
var mergeIndex: Boolean = false
try {
mergeIndex = CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean
} catch {
case _ =>
mergeIndex = CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean
}
mergeIndex
}
}

/**
Expand Down

0 comments on commit 10fb74c

Please sign in to comment.