Skip to content

Commit

Permalink
Merge 1a05263 into 91a7998
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal642 committed Jul 8, 2019
2 parents 91a7998 + 1a05263 commit 7637483
Show file tree
Hide file tree
Showing 12 changed files with 261 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ public interface DataMapJob extends Serializable {

List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat);

Long executeCountJob(DistributableDataMapFormat dataMapFormat);

}
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
List<Segment> validSegments, List<Segment> invalidSegments, DataMapLevel level,
List<String> segmentsToBeRefreshed) throws IOException {
return executeDataMapJob(carbonTable, resolver, dataMapJob, partitionsToPrune, validSegments,
invalidSegments, level, false, segmentsToBeRefreshed);
invalidSegments, level, false, segmentsToBeRefreshed, false);
}

/**
Expand All @@ -240,7 +240,8 @@ public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
FilterResolverIntf resolver, DataMapJob dataMapJob, List<PartitionSpec> partitionsToPrune,
List<Segment> validSegments, List<Segment> invalidSegments, DataMapLevel level,
Boolean isFallbackJob, List<String> segmentsToBeRefreshed) throws IOException {
Boolean isFallbackJob, List<String> segmentsToBeRefreshed, boolean isCountJob)
throws IOException {
List<String> invalidSegmentNo = new ArrayList<>();
for (Segment segment : invalidSegments) {
invalidSegmentNo.add(segment.getSegmentNo());
Expand All @@ -249,9 +250,11 @@ public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
DistributableDataMapFormat dataMapFormat =
new DistributableDataMapFormat(carbonTable, resolver, validSegments, invalidSegmentNo,
partitionsToPrune, false, level, isFallbackJob);
List<ExtendedBlocklet> prunedBlocklets = dataMapJob.execute(dataMapFormat);
// Apply expression on the blocklets.
return prunedBlocklets;
if (isCountJob) {
dataMapFormat.setCountStarJob();
dataMapFormat.setIsWriteToFile(false);
}
return dataMapJob.execute(dataMapFormat);
}

public static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
Expand Down Expand Up @@ -91,6 +90,8 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl

private boolean isWriteToFile = true;

private boolean isCountStarJob = false;

DistributableDataMapFormat() {

}
Expand All @@ -103,7 +104,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
this.dataMapToClear = dataMapToClear;
}

DistributableDataMapFormat(CarbonTable table, FilterResolverIntf filterResolverIntf,
public DistributableDataMapFormat(CarbonTable table, FilterResolverIntf filterResolverIntf,
List<Segment> validSegments, List<String> invalidSegments, List<PartitionSpec> partitions,
boolean isJobToClearDataMaps, DataMapLevel dataMapLevel, boolean isFallbackJob)
throws IOException {
Expand Down Expand Up @@ -136,7 +137,6 @@ public RecordReader<Void, ExtendedBlocklet> createRecordReader(InputSplit inputS
return new RecordReader<Void, ExtendedBlocklet>() {
private Iterator<ExtendedBlocklet> blockletIterator;
private ExtendedBlocklet currBlocklet;
private List<DataMap> dataMaps;

@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
Expand All @@ -149,7 +149,6 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
if (dataMapLevel == null) {
TableDataMap defaultDataMap = DataMapStoreManager.getInstance()
.getDataMap(table, distributable.getDistributable().getDataMapSchema());
dataMaps = defaultDataMap.getTableDataMaps(distributable.getDistributable());
blocklets = defaultDataMap
.prune(segmentsToLoad, new DataMapFilter(filterResolverIntf), partitions);
blocklets = DataMapUtil
Expand Down Expand Up @@ -192,11 +191,6 @@ public float getProgress() throws IOException, InterruptedException {

@Override
public void close() throws IOException {
if (null != dataMaps) {
for (DataMap dataMap : dataMaps) {
dataMap.finish();
}
}
}
};
}
Expand Down Expand Up @@ -247,6 +241,7 @@ public void write(DataOutput out) throws IOException {
out.writeUTF(taskGroupDesc);
out.writeUTF(queryId);
out.writeBoolean(isWriteToFile);
out.writeBoolean(isCountStarJob);
}

@Override
Expand Down Expand Up @@ -292,6 +287,7 @@ public void readFields(DataInput in) throws IOException {
this.taskGroupDesc = in.readUTF();
this.queryId = in.readUTF();
this.isWriteToFile = in.readBoolean();
this.isCountStarJob = in.readBoolean();
}

private void initReadCommittedScope() throws IOException {
Expand Down Expand Up @@ -403,4 +399,12 @@ public void createDataMapChooser() throws IOException {
this.dataMapChooser = new DataMapChooser(table);
}
}

public void setCountStarJob() {
this.isCountStarJob = true;
}

public boolean isCountStarJob() {
return this.isCountStarJob;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public class ExtendedBlocklet extends Blocklet {

private CarbonInputSplit inputSplit;

private Long count;

private String segmentNo;

public ExtendedBlocklet() {

}
Expand Down Expand Up @@ -78,6 +82,9 @@ public long getLength() {
}

public String getSegmentId() {
if (segmentNo != null) {
return segmentNo;
}
return this.inputSplit.getSegmentId();
}

Expand All @@ -92,8 +99,12 @@ public String getPath() {
return getFilePath();
}

public String getDataMapWriterPath() {
return this.inputSplit.getDataMapWritePath();
public Long getRowCount() {
if (count != null) {
return count;
} else {
return (long) inputSplit.getRowCount();
}
}

public void setDataMapWriterPath(String dataMapWriterPath) {
Expand Down Expand Up @@ -161,31 +172,35 @@ public void setColumnSchema(List<ColumnSchema> columnSchema) {
* @param uniqueLocation
* @throws IOException
*/
public void serializeData(DataOutput out, Map<String, Short> uniqueLocation)
public void serializeData(DataOutput out, Map<String, Short> uniqueLocation, boolean isCountJob)
throws IOException {
super.write(out);
if (dataMapUniqueId == null) {
out.writeBoolean(false);
if (isCountJob) {
out.writeLong(inputSplit.getRowCount());
out.writeUTF(inputSplit.getSegmentId());
} else {
out.writeBoolean(true);
out.writeUTF(dataMapUniqueId);
}
out.writeBoolean(inputSplit != null);
if (inputSplit != null) {
// creating byte array output stream to get the size of input split serializeData size
ExtendedByteArrayOutputStream ebos = new ExtendedByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(ebos);
inputSplit.setFilePath(null);
inputSplit.setBucketId(null);
inputSplit.setWriteDeleteDelta(false);
if (inputSplit.isBlockCache()) {
inputSplit.updateFooteroffset();
inputSplit.updateBlockLength();
inputSplit.setWriteDetailInfo(false);
if (dataMapUniqueId == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(dataMapUniqueId);
}
out.writeBoolean(inputSplit != null);
if (inputSplit != null) {
// creating byte array output stream to get the size of input split serializeData size
ExtendedByteArrayOutputStream ebos = new ExtendedByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(ebos);
inputSplit.setFilePath(null);
inputSplit.setBucketId(null);
if (inputSplit.isBlockCache()) {
inputSplit.updateFooteroffset();
inputSplit.updateBlockLength();
inputSplit.setWriteDetailInfo(false);
}
inputSplit.serializeFields(dos, uniqueLocation);
out.writeInt(ebos.size());
out.write(ebos.getBuffer(), 0, ebos.size());
}
inputSplit.serializeFields(dos, uniqueLocation);
out.writeInt(ebos.size());
out.write(ebos.getBuffer(), 0 , ebos.size());
}
}

Expand All @@ -196,9 +211,15 @@ public void serializeData(DataOutput out, Map<String, Short> uniqueLocation)
* @param tablePath
* @throws IOException
*/
public void deserializeFields(DataInput in, String[] locations, String tablePath)
public void deserializeFields(DataInput in, String[] locations, String tablePath,
boolean isCountJob)
throws IOException {
super.readFields(in);
if (isCountJob) {
count = in.readLong();
segmentNo = in.readUTF();
return;
}
if (in.readBoolean()) {
dataMapUniqueId = in.readUTF();
}
Expand Down
Loading

0 comments on commit 7637483

Please sign in to comment.