Skip to content

Commit

Permalink
Merge 4df0ea2 into 2980803
Browse files Browse the repository at this point in the history
  • Loading branch information
kumarvishal09 committed Mar 21, 2019
2 parents 2980803 + 4df0ea2 commit 4d84dd2
Show file tree
Hide file tree
Showing 32 changed files with 763 additions and 631 deletions.
Expand Up @@ -473,6 +473,15 @@ public void clearInvalidSegments(CarbonTable carbonTable, List<Segment> segments
* @param identifier Table identifier
*/
public void clearDataMaps(AbsoluteTableIdentifier identifier) {
clearDataMaps(identifier, true);
}

/**
* Clear the datamap/datamaps of a table from memory
*
* @param identifier Table identifier
*/
public void clearDataMaps(AbsoluteTableIdentifier identifier, boolean launchJob) {
CarbonTable carbonTable = getCarbonTable(identifier);
String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName();
List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
Expand All @@ -483,7 +492,7 @@ public void clearDataMaps(AbsoluteTableIdentifier identifier) {
tableIndices = allDataMaps.get(tableUniqueName);
}
}
if (null != carbonTable && tableIndices != null) {
if (null != carbonTable && tableIndices != null && launchJob) {
try {
DataMapUtil.executeDataMapJobForClearingDataMaps(carbonTable);
} catch (IOException e) {
Expand Down
Expand Up @@ -110,6 +110,7 @@ public RecordReader<Void, ExtendedBlocklet> createRecordReader(InputSplit inputS
distributable.getDistributable(),
dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions);
for (ExtendedBlocklet blocklet : blocklets) {
blocklet.getDetailInfo();
blocklet.setDataMapUniqueId(distributable.getUniqueId());
}
blockletIterator = blocklets.iterator();
Expand Down
24 changes: 19 additions & 5 deletions core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
Expand Up @@ -62,13 +62,16 @@ public class Segment implements Serializable {
*/
private LoadMetadataDetails loadMetadataDetails;

private String segmentString;

public Segment(String segmentNo) {
this.segmentNo = segmentNo;
}

public Segment(String segmentNo, ReadCommittedScope readCommittedScope) {
this.segmentNo = segmentNo;
this.readCommittedScope = readCommittedScope;
segmentString = segmentNo;
}

/**
Expand All @@ -82,6 +85,11 @@ public Segment(String segmentNo, String segmentFileName) {
this.segmentNo = segmentNo;
this.segmentFileName = segmentFileName;
this.readCommittedScope = null;
if (segmentFileName != null) {
segmentString = segmentNo + "#" + segmentFileName;
} else {
segmentString = segmentNo;
}
}

/**
Expand All @@ -94,6 +102,11 @@ public Segment(String segmentNo, String segmentFileName, ReadCommittedScope read
this.segmentNo = segmentNo;
this.segmentFileName = segmentFileName;
this.readCommittedScope = readCommittedScope;
if (segmentFileName != null) {
segmentString = segmentNo + "#" + segmentFileName;
} else {
segmentString = segmentNo;
}
}

/**
Expand All @@ -107,6 +120,11 @@ public Segment(String segmentNo, String segmentFileName, ReadCommittedScope read
this.segmentFileName = segmentFileName;
this.readCommittedScope = readCommittedScope;
this.loadMetadataDetails = loadMetadataDetails;
if (segmentFileName != null) {
segmentString = segmentNo + "#" + segmentFileName;
} else {
segmentString = segmentNo;
}
}

/**
Expand Down Expand Up @@ -233,11 +251,7 @@ public void setFilteredIndexShardName(String filteredIndexShardName) {
}

@Override public String toString() {
if (segmentFileName != null) {
return segmentNo + "#" + segmentFileName;
} else {
return segmentNo;
}
return segmentString;
}

public LoadMetadataDetails getLoadMetadataDetails() {
Expand Down
Expand Up @@ -125,7 +125,7 @@ public List<ExtendedBlocklet> prune(List<Segment> segments, Expression filterExp
}
blocklets.addAll(addSegmentId(
blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment),
segment.toString()));
segment));
}
return blocklets;
}
Expand All @@ -146,15 +146,11 @@ public List<ExtendedBlocklet> prune(List<Segment> segments, final FilterResolver
final List<ExtendedBlocklet> blocklets = new ArrayList<>();
final Map<Segment, List<DataMap>> dataMaps = dataMapFactory.getDataMaps(segments);
// for non-filter queries
if (filterExp == null) {
// if filter is not passed, then return all the blocklets.
return pruneWithoutFilter(segments, partitions, blocklets);
}
// for filter queries
int totalFiles = 0;
int datamapsCount = 0;
for (Segment segment : segments) {
for (DataMap dataMap : dataMaps.get(segment)) {
for (DataMap dataMap: dataMaps.get(segment)) {
totalFiles += dataMap.getNumberOfEntries();
datamapsCount++;
}
Expand All @@ -166,11 +162,16 @@ public List<ExtendedBlocklet> prune(List<Segment> segments, final FilterResolver
// As 0.1 million files block pruning can take only 1 second.
// Doing multi-thread for smaller values is not recommended as
// driver should have minimum threads opened to support multiple concurrent queries.
if (filterExp == null) {
// if filter is not passed, then return all the blocklets.
return pruneWithoutFilter(segments, partitions, blocklets);
}
return pruneWithFilter(segments, filterExp, partitions, blocklets, dataMaps);
}
// handle by multi-thread
return pruneWithFilterMultiThread(segments, filterExp, partitions, blocklets, dataMaps,
totalFiles);
List<ExtendedBlocklet> extendedBlocklets =
pruneMultiThread(segments, filterExp, partitions, blocklets, dataMaps, totalFiles);
return extendedBlocklets;
}

private List<ExtendedBlocklet> pruneWithoutFilter(List<Segment> segments,
Expand All @@ -179,7 +180,7 @@ private List<ExtendedBlocklet> pruneWithoutFilter(List<Segment> segments,
List<Blocklet> allBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, partitions);
blocklets.addAll(
addSegmentId(blockletDetailsFetcher.getExtendedBlocklets(allBlocklets, segment),
segment.toString()));
segment));
}
return blocklets;
}
Expand All @@ -195,12 +196,12 @@ private List<ExtendedBlocklet> pruneWithFilter(List<Segment> segments,
}
blocklets.addAll(
addSegmentId(blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment),
segment.toString()));
segment));
}
return blocklets;
}

private List<ExtendedBlocklet> pruneWithFilterMultiThread(List<Segment> segments,
private List<ExtendedBlocklet> pruneMultiThread(List<Segment> segments,
final FilterResolverIntf filterExp, final List<PartitionSpec> partitions,
List<ExtendedBlocklet> blocklets, final Map<Segment, List<DataMap>> dataMaps,
int totalFiles) {
Expand Down Expand Up @@ -277,7 +278,8 @@ private List<ExtendedBlocklet> pruneWithFilterMultiThread(List<Segment> segments
throw new RuntimeException(" not all the files processed ");
}
List<Future<Void>> results = new ArrayList<>(numOfThreadsForPruning);
final Map<Segment, List<Blocklet>> prunedBlockletMap = new ConcurrentHashMap<>(segments.size());
final Map<Segment, List<ExtendedBlocklet>> prunedBlockletMap =
new ConcurrentHashMap<>(segments.size());
final ExecutorService executorService = Executors.newFixedThreadPool(numOfThreadsForPruning);
final String threadName = Thread.currentThread().getName();
for (int i = 0; i < numOfThreadsForPruning; i++) {
Expand All @@ -286,16 +288,22 @@ private List<ExtendedBlocklet> pruneWithFilterMultiThread(List<Segment> segments
@Override public Void call() throws IOException {
Thread.currentThread().setName(threadName);
for (SegmentDataMapGroup segmentDataMapGroup : segmentDataMapGroups) {
List<Blocklet> pruneBlocklets = new ArrayList<>();
List<ExtendedBlocklet> pruneBlocklets = new ArrayList<>();
List<DataMap> dataMapList = dataMaps.get(segmentDataMapGroup.getSegment());
SegmentProperties segmentProperties =
segmentPropertiesFetcher.getSegmentPropertiesFromDataMap(dataMapList.get(0));
Segment segment = segmentDataMapGroup.getSegment();
for (int i = segmentDataMapGroup.getFromIndex();
i <= segmentDataMapGroup.getToIndex(); i++) {
pruneBlocklets.addAll(dataMapList.get(i).prune(filterExp,
segmentPropertiesFetcher.getSegmentProperties(segmentDataMapGroup.getSegment()),
partitions));
List<Blocklet> dmPruneBlocklets = dataMapList.get(i).prune(filterExp,
segmentProperties,
partitions);
pruneBlocklets.addAll(addSegmentId(blockletDetailsFetcher
.getExtendedBlocklets(dmPruneBlocklets, segment),
segment));
}
synchronized (prunedBlockletMap) {
List<Blocklet> pruneBlockletsExisting =
List<ExtendedBlocklet> pruneBlockletsExisting =
prunedBlockletMap.get(segmentDataMapGroup.getSegment());
if (pruneBlockletsExisting != null) {
pruneBlockletsExisting.addAll(pruneBlocklets);
Expand All @@ -322,14 +330,8 @@ private List<ExtendedBlocklet> pruneWithFilterMultiThread(List<Segment> segments
throw new RuntimeException(e);
}
}
for (Map.Entry<Segment, List<Blocklet>> entry : prunedBlockletMap.entrySet()) {
try {
blocklets.addAll(addSegmentId(
blockletDetailsFetcher.getExtendedBlocklets(entry.getValue(), entry.getKey()),
entry.getKey().toString()));
} catch (IOException e) {
throw new RuntimeException(e);
}
for (Map.Entry<Segment, List<ExtendedBlocklet>> entry : prunedBlockletMap.entrySet()) {
blocklets.addAll(entry.getValue());
}
return blocklets;
}
Expand All @@ -351,9 +353,9 @@ private int getNumOfThreadsForPruning() {
}

private List<ExtendedBlocklet> addSegmentId(List<ExtendedBlocklet> pruneBlocklets,
String segmentId) {
Segment segment) {
for (ExtendedBlocklet blocklet : pruneBlocklets) {
blocklet.setSegmentId(segmentId);
blocklet.setSegment(segment);
}
return pruneBlocklets;
}
Expand Down Expand Up @@ -423,7 +425,7 @@ public List<ExtendedBlocklet> prune(List<DataMap> dataMaps, DataMapDistributable
detailedBlocklet.setDataMapWriterPath(blockletwritePath);
serializer.serializeBlocklet((FineGrainBlocklet) blocklet, blockletwritePath);
}
detailedBlocklet.setSegmentId(distributable.getSegment().toString());
detailedBlocklet.setSegment(distributable.getSegment());
detailedBlocklets.add(detailedBlocklet);
}
return detailedBlocklets;
Expand Down
Expand Up @@ -81,19 +81,46 @@ public static FileReader getFileHolder(FileType fileType, Configuration configur
}

public static FileType getFileType(String path) {
String lowerPath = path.toLowerCase();
if (lowerPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
FileType fileType = getFileTypeWithActualPath(path);
if (fileType != null) {
return fileType;
}
fileType = getFileTypeWithLowerCase(path);
if (fileType != null) {
return fileType;
}
return FileType.LOCAL;
}

private static FileType getFileTypeWithLowerCase(String path) {
String lowerCase = path.toLowerCase();
if (lowerCase.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
return FileType.HDFS;
} else if (lowerPath.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) {
} else if (lowerCase.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) {
return FileType.ALLUXIO;
} else if (lowerPath.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) {
} else if (lowerCase.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) {
return FileType.VIEWFS;
} else if (lowerPath.startsWith(CarbonCommonConstants.S3N_PREFIX) ||
lowerPath.startsWith(CarbonCommonConstants.S3A_PREFIX) ||
lowerPath.startsWith(CarbonCommonConstants.S3_PREFIX)) {
} else if (lowerCase.startsWith(CarbonCommonConstants.S3N_PREFIX) || lowerCase
.startsWith(CarbonCommonConstants.S3A_PREFIX) || lowerCase
.startsWith(CarbonCommonConstants.S3_PREFIX)) {
return FileType.S3;
}
return FileType.LOCAL;
return null;
}

private static FileType getFileTypeWithActualPath(String path) {
if (path.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
return FileType.HDFS;
} else if (path.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) {
return FileType.ALLUXIO;
} else if (path.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) {
return FileType.VIEWFS;
} else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX) || path
.startsWith(CarbonCommonConstants.S3A_PREFIX) || path
.startsWith(CarbonCommonConstants.S3_PREFIX)) {
return FileType.S3;
}
return null;
}

public static CarbonFile getCarbonFile(String path) {
Expand Down

0 comments on commit 4d84dd2

Please sign in to comment.