Skip to content

Commit

Permalink
Merge d847a8a into 66982f3
Browse files Browse the repository at this point in the history
  • Loading branch information
kumarvishal09 committed Mar 20, 2019
2 parents 66982f3 + d847a8a commit a078ee2
Show file tree
Hide file tree
Showing 30 changed files with 732 additions and 617 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public RecordReader<Void, ExtendedBlocklet> createRecordReader(InputSplit inputS
distributable.getDistributable(),
dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions);
for (ExtendedBlocklet blocklet : blocklets) {
blocklet.getDetailInfo();
blocklet.setDataMapUniqueId(distributable.getUniqueId());
}
blockletIterator = blocklets.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public List<ExtendedBlocklet> prune(List<Segment> segments, Expression filterExp
}
blocklets.addAll(addSegmentId(
blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment),
segment.toString()));
segment));
}
return blocklets;
}
Expand All @@ -146,15 +146,11 @@ public List<ExtendedBlocklet> prune(List<Segment> segments, final FilterResolver
final List<ExtendedBlocklet> blocklets = new ArrayList<>();
final Map<Segment, List<DataMap>> dataMaps = dataMapFactory.getDataMaps(segments);
// for non-filter queries
if (filterExp == null) {
// if filter is not passed, then return all the blocklets.
return pruneWithoutFilter(segments, partitions, blocklets);
}
// for filter queries
int totalFiles = 0;
int datamapsCount = 0;
for (Segment segment : segments) {
for (DataMap dataMap : dataMaps.get(segment)) {
for (DataMap dataMap: dataMaps.get(segment)) {
totalFiles += dataMap.getNumberOfEntries();
datamapsCount++;
}
Expand All @@ -166,11 +162,16 @@ public List<ExtendedBlocklet> prune(List<Segment> segments, final FilterResolver
// As 0.1 million files block pruning can take only 1 second.
// Doing multi-thread for smaller values is not recommended as
// driver should have minimum threads opened to support multiple concurrent queries.
if (filterExp == null) {
// if filter is not passed, then return all the blocklets.
return pruneWithoutFilter(segments, partitions, blocklets);
}
return pruneWithFilter(segments, filterExp, partitions, blocklets, dataMaps);
}
// handle by multi-thread
return pruneWithFilterMultiThread(segments, filterExp, partitions, blocklets, dataMaps,
totalFiles);
List<ExtendedBlocklet> extendedBlocklets =
pruneMultiThread(segments, filterExp, partitions, blocklets, dataMaps, totalFiles);
return extendedBlocklets;
}

private List<ExtendedBlocklet> pruneWithoutFilter(List<Segment> segments,
Expand All @@ -179,7 +180,7 @@ private List<ExtendedBlocklet> pruneWithoutFilter(List<Segment> segments,
List<Blocklet> allBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, partitions);
blocklets.addAll(
addSegmentId(blockletDetailsFetcher.getExtendedBlocklets(allBlocklets, segment),
segment.toString()));
segment));
}
return blocklets;
}
Expand All @@ -195,12 +196,12 @@ private List<ExtendedBlocklet> pruneWithFilter(List<Segment> segments,
}
blocklets.addAll(
addSegmentId(blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment),
segment.toString()));
segment));
}
return blocklets;
}

private List<ExtendedBlocklet> pruneWithFilterMultiThread(List<Segment> segments,
private List<ExtendedBlocklet> pruneMultiThread(List<Segment> segments,
final FilterResolverIntf filterExp, final List<PartitionSpec> partitions,
List<ExtendedBlocklet> blocklets, final Map<Segment, List<DataMap>> dataMaps,
int totalFiles) {
Expand Down Expand Up @@ -277,7 +278,8 @@ private List<ExtendedBlocklet> pruneWithFilterMultiThread(List<Segment> segments
throw new RuntimeException(" not all the files processed ");
}
List<Future<Void>> results = new ArrayList<>(numOfThreadsForPruning);
final Map<Segment, List<Blocklet>> prunedBlockletMap = new ConcurrentHashMap<>(segments.size());
final Map<Segment, List<ExtendedBlocklet>> prunedBlockletMap =
new ConcurrentHashMap<>(segments.size());
final ExecutorService executorService = Executors.newFixedThreadPool(numOfThreadsForPruning);
final String threadName = Thread.currentThread().getName();
for (int i = 0; i < numOfThreadsForPruning; i++) {
Expand All @@ -286,16 +288,22 @@ private List<ExtendedBlocklet> pruneWithFilterMultiThread(List<Segment> segments
@Override public Void call() throws IOException {
Thread.currentThread().setName(threadName);
for (SegmentDataMapGroup segmentDataMapGroup : segmentDataMapGroups) {
List<Blocklet> pruneBlocklets = new ArrayList<>();
List<ExtendedBlocklet> pruneBlocklets = new ArrayList<>();
List<DataMap> dataMapList = dataMaps.get(segmentDataMapGroup.getSegment());
SegmentProperties segmentProperties =
segmentPropertiesFetcher.getSegmentPropertiesFromDataMap(dataMapList.get(0));
Segment segment = segmentDataMapGroup.getSegment();
for (int i = segmentDataMapGroup.getFromIndex();
i <= segmentDataMapGroup.getToIndex(); i++) {
pruneBlocklets.addAll(dataMapList.get(i).prune(filterExp,
segmentPropertiesFetcher.getSegmentProperties(segmentDataMapGroup.getSegment()),
partitions));
List<Blocklet> dmPruneBlocklets = dataMapList.get(i).prune(filterExp,
segmentProperties,
partitions);
pruneBlocklets.addAll(addSegmentId(blockletDetailsFetcher
.getExtendedBlocklets(dmPruneBlocklets, segment),
segment));
}
synchronized (prunedBlockletMap) {
List<Blocklet> pruneBlockletsExisting =
List<ExtendedBlocklet> pruneBlockletsExisting =
prunedBlockletMap.get(segmentDataMapGroup.getSegment());
if (pruneBlockletsExisting != null) {
pruneBlockletsExisting.addAll(pruneBlocklets);
Expand All @@ -322,14 +330,8 @@ private List<ExtendedBlocklet> pruneWithFilterMultiThread(List<Segment> segments
throw new RuntimeException(e);
}
}
for (Map.Entry<Segment, List<Blocklet>> entry : prunedBlockletMap.entrySet()) {
try {
blocklets.addAll(addSegmentId(
blockletDetailsFetcher.getExtendedBlocklets(entry.getValue(), entry.getKey()),
entry.getKey().toString()));
} catch (IOException e) {
throw new RuntimeException(e);
}
for (Map.Entry<Segment, List<ExtendedBlocklet>> entry : prunedBlockletMap.entrySet()) {
blocklets.addAll(entry.getValue());
}
return blocklets;
}
Expand All @@ -351,9 +353,9 @@ private int getNumOfThreadsForPruning() {
}

private List<ExtendedBlocklet> addSegmentId(List<ExtendedBlocklet> pruneBlocklets,
String segmentId) {
Segment segment) {
for (ExtendedBlocklet blocklet : pruneBlocklets) {
blocklet.setSegmentId(segmentId);
blocklet.setSegment(segment);
}
return pruneBlocklets;
}
Expand Down Expand Up @@ -423,7 +425,7 @@ public List<ExtendedBlocklet> prune(List<DataMap> dataMaps, DataMapDistributable
detailedBlocklet.setDataMapWriterPath(blockletwritePath);
serializer.serializeBlocklet((FineGrainBlocklet) blocklet, blockletwritePath);
}
detailedBlocklet.setSegmentId(distributable.getSegment().toString());
detailedBlocklet.setSegment(distributable.getSegment());
detailedBlocklets.add(detailedBlocklet);
}
return detailedBlocklets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,46 @@ public static FileReader getFileHolder(FileType fileType, Configuration configur
}

public static FileType getFileType(String path) {
String lowerPath = path.toLowerCase();
if (lowerPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
FileType fileType = getFileTypeWithActualPath(path);
if (fileType != null) {
return fileType;
}
fileType = getFileTypeWithLowerCase(path);
if (fileType != null) {
return fileType;
}
return FileType.LOCAL;
}

private static FileType getFileTypeWithLowerCase(String path) {
String lowerCase = path.toLowerCase();
if (lowerCase.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
return FileType.HDFS;
} else if (lowerPath.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) {
} else if (lowerCase.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) {
return FileType.ALLUXIO;
} else if (lowerPath.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) {
} else if (lowerCase.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) {
return FileType.VIEWFS;
} else if (lowerPath.startsWith(CarbonCommonConstants.S3N_PREFIX) ||
lowerPath.startsWith(CarbonCommonConstants.S3A_PREFIX) ||
lowerPath.startsWith(CarbonCommonConstants.S3_PREFIX)) {
} else if (lowerCase.startsWith(CarbonCommonConstants.S3N_PREFIX) || lowerCase
.startsWith(CarbonCommonConstants.S3A_PREFIX) || lowerCase
.startsWith(CarbonCommonConstants.S3_PREFIX)) {
return FileType.S3;
}
return FileType.LOCAL;
return null;
}

private static FileType getFileTypeWithActualPath(String path) {
if (path.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
return FileType.HDFS;
} else if (path.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) {
return FileType.ALLUXIO;
} else if (path.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) {
return FileType.VIEWFS;
} else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX) || path
.startsWith(CarbonCommonConstants.S3A_PREFIX) || path
.startsWith(CarbonCommonConstants.S3_PREFIX)) {
return FileType.S3;
}
return null;
}

public static CarbonFile getCarbonFile(String path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,70 +16,79 @@
*/
package org.apache.carbondata.core.indexstore;

import java.io.IOException;
import java.util.List;

import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.indexstore.row.DataMapRow;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.hadoop.CarbonInputSplit;

/**
* Detailed blocklet information
*/
public class ExtendedBlocklet extends Blocklet {

private String segmentId;

private BlockletDetailInfo detailInfo;

private long length;

private String[] location;

private String dataMapWriterPath;

private String dataMapUniqueId;

public ExtendedBlocklet(String filePath, String blockletId) {
super(filePath, blockletId);
}
private CarbonInputSplit inputSplit;

public ExtendedBlocklet(String filePath, String blockletId,
boolean compareBlockletIdForObjectMatching) {
boolean compareBlockletIdForObjectMatching, ColumnarFormatVersion version) {
super(filePath, blockletId, compareBlockletIdForObjectMatching);
try {
this.inputSplit = CarbonInputSplit.from(null, blockletId, filePath, 0, 0, version, null);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public BlockletDetailInfo getDetailInfo() {
return detailInfo;
public ExtendedBlocklet(String filePath, String blockletId, ColumnarFormatVersion version) {
this(filePath, blockletId, true, version);
}

public void setDetailInfo(BlockletDetailInfo detailInfo) {
this.detailInfo = detailInfo;
public BlockletDetailInfo getDetailInfo() {
return this.inputSplit.getDetailInfo();
}

public void setLocation(String[] location) {
this.location = location;
public void setDataMapRow(DataMapRow dataMapRow) {
this.inputSplit.setDataMapRow(dataMapRow);
}

public String[] getLocations() {
return location;
try {
return this.inputSplit.getLocations();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public long getLength() {
return length;
return this.inputSplit.getLength();
}

public String getSegmentId() {
return segmentId;
return this.inputSplit.getSegmentId();
}

public void setSegmentId(String segmentId) {
this.segmentId = segmentId;
public Segment getSegment() {
return this.inputSplit.getSegment();
}
public void setSegment(Segment segment) {
this.inputSplit.setSegment(segment);
}

public String getPath() {
return getFilePath();
}

public String getDataMapWriterPath() {
return dataMapWriterPath;
return this.inputSplit.getDataMapWritePath();
}

public void setDataMapWriterPath(String dataMapWriterPath) {
this.dataMapWriterPath = dataMapWriterPath;
this.inputSplit.setDataMapWritePath(dataMapWriterPath);
}

public String getDataMapUniqueId() {
Expand All @@ -98,13 +107,41 @@ public void setDataMapUniqueId(String dataMapUniqueId) {
}

ExtendedBlocklet that = (ExtendedBlocklet) o;

return segmentId != null ? segmentId.equals(that.segmentId) : that.segmentId == null;
return inputSplit.getSegmentId() != null ?
inputSplit.getSegmentId().equals(that.inputSplit.getSegmentId()) :
that.inputSplit.getSegmentId() == null;
}

@Override public int hashCode() {
int result = super.hashCode();
result = 31 * result + (segmentId != null ? segmentId.hashCode() : 0);
result = 31 * result + (inputSplit.getSegmentId() != null ?
inputSplit.getSegmentId().hashCode() :
0);
return result;
}

public CarbonInputSplit getInputSplit() {
return inputSplit;
}

public void setColumnCardinality(int[] cardinality) {
inputSplit.setColumnCardinality(cardinality);
}

public void setLegacyStore(boolean isLegacyStore) {
inputSplit.setLegacyStore(isLegacyStore);
}

public void setUseMinMaxForPruning(boolean useMinMaxForPruning) {
this.inputSplit.setUseMinMaxForPruning(useMinMaxForPruning);
}

public void setIsBlockCache(boolean isBlockCache) {
this.inputSplit.setIsBlockCache(isBlockCache);
}

public void setColumnSchema(List<ColumnSchema> columnSchema) {
this.inputSplit.setColumnSchema(columnSchema);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;

import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.datastore.block.SegmentProperties;

/**
Expand All @@ -35,4 +36,6 @@ public interface SegmentPropertiesFetcher {
*/
SegmentProperties getSegmentProperties(Segment segment)
throws IOException;

SegmentProperties getSegmentPropertiesFromDataMap(DataMap coarseGrainDataMap) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ public class TableBlockIndexUniqueIdentifier implements Serializable {

private String segmentId;

private String uniqueName;

public TableBlockIndexUniqueIdentifier(String indexFilePath, String indexFileName,
String mergeIndexFileName, String segmentId) {
this.indexFilePath = indexFilePath;
this.indexFileName = indexFileName;
this.mergeIndexFileName = mergeIndexFileName;
this.segmentId = segmentId;
this.uniqueName = indexFilePath + CarbonCommonConstants.FILE_SEPARATOR + indexFileName;
}

/**
Expand All @@ -51,7 +54,7 @@ public TableBlockIndexUniqueIdentifier(String indexFilePath, String indexFileNam
* @return
*/
public String getUniqueTableSegmentIdentifier() {
return indexFilePath + CarbonCommonConstants.FILE_SEPARATOR + indexFileName;
return this.uniqueName;
}

public String getIndexFilePath() {
Expand Down
Loading

0 comments on commit a078ee2

Please sign in to comment.