Skip to content

Commit

Permalink
[CARBONDATA-3040][BloomDataMap] Fix bug for merging bloom index
Browse files Browse the repository at this point in the history
Problem
There is a bug which causes query failure when we create two bloom datamaps on same table with data.

Analyze
Since we already have data, each create datamap will trigger rebuild datamap task and then trigger bloom index file merging. By debuging, we found the first datamap's bloom index files would be merged two times and the second time made bloom index file empty.

The procedure goes as below:

  1. create table
  2. load data
  3. create bloom datamap1: rebuild datamap1 for existing data, event listener is trigger to merge index files for all bloom datamaps( currently only datamap1 )
  4. create bloom datamap2: rebuild datamap2 for existing data, event listener is trigger to merge index files for all bloom datamaps (currently datamap1 and datamap2)

Because the event does not has information which datamap it rebuilt, it always rebuilds all bloom datamap. So datamap1's bloom index files would be merged 2 times, but only remains a mergeShard folder when it ran the second merged such that no file input for merging and the final merge bloom index files are empty.

Solution
Send the datamap name in rebuild event for filter and only merge bloom index files for the specific datamap. Also add file check whether mergeShard already exists before merging.

This closes #2851
  • Loading branch information
kevinjmh authored and xuchuanyin committed Oct 25, 2018
1 parent e4806b9 commit 33a6dc2
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public class BloomIndexFileStore {


public static void mergeBloomIndexFile(String dmSegmentPathString, List<String> indexCols) {

// Step 1. check current folders

// get all shard paths of old store
CarbonFile segmentPath = FileFactory.getCarbonFile(dmSegmentPathString,
FileFactory.getFileType(dmSegmentPathString));
Expand All @@ -72,6 +75,9 @@ public boolean accept(CarbonFile file) {

String mergeShardPath = dmSegmentPathString + File.separator + MERGE_BLOOM_INDEX_SHARD_NAME;
String mergeInprogressFile = dmSegmentPathString + File.separator + MERGE_INPROGRESS_FILE;

// Step 2. prepare for fail-safe merging

try {
// delete mergeShard folder if exists
if (FileFactory.isFileExist(mergeShardPath)) {
Expand All @@ -87,10 +93,12 @@ public boolean accept(CarbonFile file) {
throw new RuntimeException("Failed to create directory " + mergeShardPath);
}
} catch (IOException e) {
LOGGER.error("Error occurs while create directory " + mergeShardPath, e);
throw new RuntimeException("Error occurs while create directory " + mergeShardPath);
throw new RuntimeException(e);
}

// Step 3. merge index files
// Query won't use mergeShard until MERGE_INPROGRESS_FILE is deleted

// for each index column, merge the bloomindex files from all shards into one
for (String indexCol: indexCols) {
String mergeIndexFile = getMergeBloomIndexFile(mergeShardPath, indexCol);
Expand All @@ -115,15 +123,17 @@ public boolean accept(CarbonFile file) {
}
} catch (IOException e) {
LOGGER.error("Error occurs while merge bloom index file of column: " + indexCol, e);
// delete merge shard of bloom index for this segment when failed
// if any column failed, delete merge shard for this segment and exit
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(mergeShardPath));
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(mergeInprogressFile));
throw new RuntimeException(
"Error occurs while merge bloom index file of column: " + indexCol);
"Error occurs while merge bloom index file of column: " + indexCol, e);
} finally {
CarbonUtil.closeStreams(dataInputStream, dataOutputStream);
}
}
// delete flag file and mergeShard can be used

// Step 4. delete flag file and mergeShard can be used
try {
FileFactory.deleteFile(mergeInprogressFile, FileFactory.getFileType(mergeInprogressFile));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,17 @@ case class BuildDataMapPreExecutionEvent(sparkSession: SparkSession,
/**
* For handling operation's after finish of index build over table with index datamap
* example: bloom datamap, Lucene datamap
*
* @param sparkSession
* @param identifier
* @param dmName set to specify datamap name in rebuild process;
* set to Null in loading and compaction and it will deal all datamaps
* @param segmentIdList
* @param isFromRebuild set to false in loading process for skipping lazy datamap
*/
case class BuildDataMapPostExecutionEvent(sparkSession: SparkSession,
identifier: AbsoluteTableIdentifier, segmentIdList: Seq[String], isFromRebuild: Boolean)
identifier: AbsoluteTableIdentifier, dmName: String,
segmentIdList: Seq[String], isFromRebuild: Boolean)
extends Event with TableEventInfo

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ object IndexDataMapRebuildRDD {
}

val buildDataMapPostExecutionEvent = new BuildDataMapPostExecutionEvent(sparkSession,
tableIdentifier, validSegments.asScala.map(_.getSegmentNo), true)
tableIdentifier, schema.getDataMapName, validSegments.asScala.map(_.getSegmentNo), true)
OperationListenerBus.getInstance().fireEvent(buildDataMapPostExecutionEvent, operationContext)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
if (null != tableDataMaps) {
val buildDataMapPostExecutionEvent = new BuildDataMapPostExecutionEvent(
sqlContext.sparkSession, carbonTable.getAbsoluteTableIdentifier,
Seq(carbonLoadModel.getSegmentId), true)
null, Seq(mergedLoadNumber), true)
OperationListenerBus.getInstance()
.fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,14 @@ class MergeBloomIndexEventListener extends OperationEventListener with Logging {
_.getDataMapSchema.getProviderName.equalsIgnoreCase(
DataMapClassProvider.BLOOMFILTER.getShortName))

// for load process, filter lazy datamap
if (!datamapPostEvent.isFromRebuild) {
if (datamapPostEvent.isFromRebuild) {
if (null != datamapPostEvent.dmName) {
// for rebuild process
bloomDatamaps = bloomDatamaps.filter(
_.getDataMapSchema.getDataMapName.equalsIgnoreCase(datamapPostEvent.dmName))
}
} else {
// for load process, skip lazy datamap
bloomDatamaps = bloomDatamaps.filter(!_.getDataMapSchema.isLazy)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ case class CarbonLoadDataCommand(
OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
if (tableDataMaps.size() > 0) {
val buildDataMapPostExecutionEvent = BuildDataMapPostExecutionEvent(sparkSession,
table.getAbsoluteTableIdentifier, Seq(carbonLoadModel.getSegmentId), false)
table.getAbsoluteTableIdentifier, null, Seq(carbonLoadModel.getSegmentId), false)
OperationListenerBus.getInstance()
.fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext)
}
Expand Down

0 comments on commit 33a6dc2

Please sign in to comment.