Skip to content

Commit

Permalink
Merge 519ea86 into 7916aa6
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal642 committed Jan 24, 2019
2 parents 7916aa6 + 519ea86 commit 85db7d9
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 26 deletions.
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
Expand All @@ -39,6 +40,7 @@
import org.apache.carbondata.format.MergedBlockIndexHeader;

import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;

public class CarbonIndexFileMergeWriter {

Expand All @@ -52,6 +54,8 @@ public class CarbonIndexFileMergeWriter {
*/
private ThriftWriter thriftWriter;

private Logger LOGGER = LogServiceFactory.getLogService(this.getClass().getCanonicalName());

public CarbonIndexFileMergeWriter(CarbonTable table) {
this.table = table;
}
Expand All @@ -68,27 +72,32 @@ public CarbonIndexFileMergeWriter(CarbonTable table) {
*/
private String mergeCarbonIndexFilesOfSegment(String segmentId,
String tablePath, List<String> indexFileNamesTobeAdded,
boolean readFileFooterFromCarbonDataFile, String uuid) throws IOException {
Segment segment = Segment.getSegment(segmentId, tablePath);
String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
CarbonFile[] indexFiles;
SegmentFileStore sfs = null;
if (segment != null && segment.getSegmentFileName() != null) {
sfs = new SegmentFileStore(tablePath, segment.getSegmentFileName());
List<CarbonFile> indexCarbonFiles = sfs.getIndexCarbonFiles();
indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]);
} else {
indexFiles =
SegmentIndexFileStore.getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration());
}
if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) {
if (sfs == null) {
return writeMergeIndexFileBasedOnSegmentFolder(indexFileNamesTobeAdded,
readFileFooterFromCarbonDataFile, segmentPath, indexFiles, segmentId);
boolean readFileFooterFromCarbonDataFile, String uuid) {
try {
Segment segment = Segment.getSegment(segmentId, tablePath);
String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
CarbonFile[] indexFiles;
SegmentFileStore sfs = null;
if (segment != null && segment.getSegmentFileName() != null) {
sfs = new SegmentFileStore(tablePath, segment.getSegmentFileName());
List<CarbonFile> indexCarbonFiles = sfs.getIndexCarbonFiles();
indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]);
} else {
return writeMergeIndexFileBasedOnSegmentFile(
segmentId, indexFileNamesTobeAdded, sfs, indexFiles, uuid);
indexFiles =
SegmentIndexFileStore.getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration());
}
if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) {
if (sfs == null) {
return writeMergeIndexFileBasedOnSegmentFolder(indexFileNamesTobeAdded,
readFileFooterFromCarbonDataFile, segmentPath, indexFiles, segmentId);
} else {
return writeMergeIndexFileBasedOnSegmentFile(segmentId, indexFileNamesTobeAdded, sfs,
indexFiles, uuid);
}
}
} catch (Exception e) {
LOGGER.error(
"Failed to merge index files in path: " + tablePath, e);
}
return null;
}
Expand Down
Expand Up @@ -579,16 +579,19 @@ object CarbonDataRDDFactory {
}
val compactedSegments = new util.ArrayList[String]()
handleSegmentMerging(sqlContext,
carbonLoadModel,
carbonLoadModel
.getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter),
carbonTable,
compactedSegments,
operationContext)
carbonLoadModel.setMergedSegmentIds(compactedSegments)
writtenSegment
} catch {
case e: Exception =>
throw new Exception(
"Dataload is success. Auto-Compaction has failed. Please check logs.")
LOGGER.error(
"Auto-Compaction has failed. Ignoring this exception because the" +
" load is passed.", e)
writtenSegment
}
}
}
Expand Down
Expand Up @@ -874,16 +874,17 @@ case class CarbonLoadDataCommand(
// Trigger auto compaction
CarbonDataRDDFactory.handleSegmentMerging(
sparkSession.sqlContext,
carbonLoadModel,
carbonLoadModel
.getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter),
table,
compactedSegments,
operationContext)
carbonLoadModel.setMergedSegmentIds(compactedSegments)
} catch {
case e: Exception =>
throw new Exception(
"Dataload is success. Auto-Compaction has failed. Please check logs.",
e)
LOGGER.error(
"Auto-Compaction has failed. Ignoring this exception because the " +
"load is passed.", e)
}
val specs =
SegmentFileStore.getPartitionSpecs(carbonLoadModel.getSegmentId, carbonLoadModel.getTablePath)
Expand Down
Expand Up @@ -488,6 +488,9 @@ public CarbonLoadModel getCopyWithTaskNo(String taskNo) {
copy.parentTablePath = parentTablePath;
copy.sdkWriterCores = sdkWriterCores;
copy.columnCompressor = columnCompressor;
copy.rangePartitionColumn = rangePartitionColumn;
copy.scaleFactor = scaleFactor;
copy.totalSize = totalSize;
return copy;
}

Expand Down Expand Up @@ -544,6 +547,9 @@ public CarbonLoadModel getCopyWithPartition(String header, String delimiter) {
copyObj.parentTablePath = parentTablePath;
copyObj.sdkWriterCores = sdkWriterCores;
copyObj.columnCompressor = columnCompressor;
copyObj.rangePartitionColumn = rangePartitionColumn;
copyObj.scaleFactor = scaleFactor;
copyObj.totalSize = totalSize;
return copyObj;
}

Expand Down

0 comments on commit 85db7d9

Please sign in to comment.