Skip to content

Commit

Permalink
Merge 57c6b18 into e14c817
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal642 committed Aug 6, 2019
2 parents e14c817 + 57c6b18 commit 39b00c6
Show file tree
Hide file tree
Showing 13 changed files with 317 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,8 @@ public abstract class AbstractDataMapJob implements DataMapJob {
@Override public void execute(CarbonTable carbonTable,
FileInputFormat<Void, BlockletDataMapIndexWrapper> format) {
}

@Override public Long executeCountJob(DistributableDataMapFormat dataMapFormat) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ public interface DataMapJob extends Serializable {

List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat);

Long executeCountJob(DistributableDataMapFormat dataMapFormat);

}
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
List<Segment> validSegments, List<Segment> invalidSegments, DataMapLevel level,
List<String> segmentsToBeRefreshed) throws IOException {
return executeDataMapJob(carbonTable, resolver, dataMapJob, partitionsToPrune, validSegments,
invalidSegments, level, false, segmentsToBeRefreshed);
invalidSegments, level, false, segmentsToBeRefreshed, false);
}

/**
Expand All @@ -241,7 +241,8 @@ public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
FilterResolverIntf resolver, DataMapJob dataMapJob, List<PartitionSpec> partitionsToPrune,
List<Segment> validSegments, List<Segment> invalidSegments, DataMapLevel level,
Boolean isFallbackJob, List<String> segmentsToBeRefreshed) throws IOException {
Boolean isFallbackJob, List<String> segmentsToBeRefreshed, boolean isCountJob)
throws IOException {
List<String> invalidSegmentNo = new ArrayList<>();
for (Segment segment : invalidSegments) {
invalidSegmentNo.add(segment.getSegmentNo());
Expand All @@ -250,9 +251,11 @@ public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
DistributableDataMapFormat dataMapFormat =
new DistributableDataMapFormat(carbonTable, resolver, validSegments, invalidSegmentNo,
partitionsToPrune, false, level, isFallbackJob);
List<ExtendedBlocklet> prunedBlocklets = dataMapJob.execute(dataMapFormat);
// Apply expression on the blocklets.
return prunedBlocklets;
if (isCountJob) {
dataMapFormat.setCountStarJob();
dataMapFormat.setIsWriteToFile(false);
}
return dataMapJob.execute(dataMapFormat);
}

public static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
Expand Down Expand Up @@ -91,6 +90,8 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl

private boolean isWriteToFile = true;

private boolean isCountStarJob = false;

DistributableDataMapFormat() {

}
Expand All @@ -103,7 +104,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
this.dataMapToClear = dataMapToClear;
}

DistributableDataMapFormat(CarbonTable table, FilterResolverIntf filterResolverIntf,
public DistributableDataMapFormat(CarbonTable table, FilterResolverIntf filterResolverIntf,
List<Segment> validSegments, List<String> invalidSegments, List<PartitionSpec> partitions,
boolean isJobToClearDataMaps, DataMapLevel dataMapLevel, boolean isFallbackJob)
throws IOException {
Expand Down Expand Up @@ -136,7 +137,6 @@ public RecordReader<Void, ExtendedBlocklet> createRecordReader(InputSplit inputS
return new RecordReader<Void, ExtendedBlocklet>() {
private Iterator<ExtendedBlocklet> blockletIterator;
private ExtendedBlocklet currBlocklet;
private List<DataMap> dataMaps;

@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
Expand All @@ -149,7 +149,6 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
if (dataMapLevel == null) {
TableDataMap defaultDataMap = DataMapStoreManager.getInstance()
.getDataMap(table, distributable.getDistributable().getDataMapSchema());
dataMaps = defaultDataMap.getTableDataMaps(distributable.getDistributable());
blocklets = defaultDataMap
.prune(segmentsToLoad, new DataMapFilter(filterResolverIntf), partitions);
blocklets = DataMapUtil
Expand Down Expand Up @@ -192,11 +191,6 @@ public float getProgress() throws IOException, InterruptedException {

@Override
public void close() throws IOException {
if (null != dataMaps) {
for (DataMap dataMap : dataMaps) {
dataMap.finish();
}
}
}
};
}
Expand Down Expand Up @@ -247,6 +241,7 @@ public void write(DataOutput out) throws IOException {
out.writeUTF(taskGroupDesc);
out.writeUTF(queryId);
out.writeBoolean(isWriteToFile);
out.writeBoolean(isCountStarJob);
}

@Override
Expand Down Expand Up @@ -292,6 +287,7 @@ public void readFields(DataInput in) throws IOException {
this.taskGroupDesc = in.readUTF();
this.queryId = in.readUTF();
this.isWriteToFile = in.readBoolean();
this.isCountStarJob = in.readBoolean();
}

private void initReadCommittedScope() throws IOException {
Expand Down Expand Up @@ -398,9 +394,29 @@ public List<String> getValidSegmentIds() {
return validSegments;
}

public List<Segment> getValidSegments() {
return validSegments;
}

public void createDataMapChooser() throws IOException {
if (null != filterResolverIntf) {
this.dataMapChooser = new DataMapChooser(table);
}
}

public void setCountStarJob() {
this.isCountStarJob = true;
}

public boolean isCountStarJob() {
return this.isCountStarJob;
}

public List<PartitionSpec> getPartitions() {
return partitions;
}

public ReadCommittedScope getReadCommittedScope() {
return readCommittedScope;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public class ExtendedBlocklet extends Blocklet {

private CarbonInputSplit inputSplit;

private Long count;

private String segmentNo;

public ExtendedBlocklet() {

}
Expand Down Expand Up @@ -78,6 +82,9 @@ public long getLength() {
}

public String getSegmentId() {
if (segmentNo != null) {
return segmentNo;
}
return this.inputSplit.getSegmentId();
}

Expand All @@ -92,8 +99,12 @@ public String getPath() {
return getFilePath();
}

public String getDataMapWriterPath() {
return this.inputSplit.getDataMapWritePath();
public Long getRowCount() {
if (count != null) {
return count;
} else {
return (long) inputSplit.getRowCount();
}
}

public void setDataMapWriterPath(String dataMapWriterPath) {
Expand Down Expand Up @@ -161,30 +172,35 @@ public void setColumnSchema(List<ColumnSchema> columnSchema) {
* @param uniqueLocation
* @throws IOException
*/
public void serializeData(DataOutput out, Map<String, Short> uniqueLocation)
public void serializeData(DataOutput out, Map<String, Short> uniqueLocation, boolean isCountJob)
throws IOException {
super.write(out);
if (dataMapUniqueId == null) {
out.writeBoolean(false);
if (isCountJob) {
out.writeLong(inputSplit.getRowCount());
out.writeUTF(inputSplit.getSegmentId());
} else {
out.writeBoolean(true);
out.writeUTF(dataMapUniqueId);
}
out.writeBoolean(inputSplit != null);
if (inputSplit != null) {
// creating byte array output stream to get the size of input split serializeData size
ExtendedByteArrayOutputStream ebos = new ExtendedByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(ebos);
inputSplit.setFilePath(null);
inputSplit.setBucketId(null);
if (inputSplit.isBlockCache()) {
inputSplit.updateFooteroffset();
inputSplit.updateBlockLength();
inputSplit.setWriteDetailInfo(false);
if (dataMapUniqueId == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(dataMapUniqueId);
}
out.writeBoolean(inputSplit != null);
if (inputSplit != null) {
// creating byte array output stream to get the size of input split serializeData size
ExtendedByteArrayOutputStream ebos = new ExtendedByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(ebos);
inputSplit.setFilePath(null);
inputSplit.setBucketId(null);
if (inputSplit.isBlockCache()) {
inputSplit.updateFooteroffset();
inputSplit.updateBlockLength();
inputSplit.setWriteDetailInfo(false);
}
inputSplit.serializeFields(dos, uniqueLocation);
out.writeInt(ebos.size());
out.write(ebos.getBuffer(), 0, ebos.size());
}
inputSplit.serializeFields(dos, uniqueLocation);
out.writeInt(ebos.size());
out.write(ebos.getBuffer(), 0 , ebos.size());
}
}

Expand All @@ -195,9 +211,15 @@ public void serializeData(DataOutput out, Map<String, Short> uniqueLocation)
* @param tablePath
* @throws IOException
*/
public void deserializeFields(DataInput in, String[] locations, String tablePath)
public void deserializeFields(DataInput in, String[] locations, String tablePath,
boolean isCountJob)
throws IOException {
super.readFields(in);
if (isCountJob) {
count = in.readLong();
segmentNo = in.readUTF();
return;
}
if (in.readBoolean()) {
dataMapUniqueId = in.readUTF();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,21 @@ public class ExtendedBlockletWrapper implements Writable, Serializable {

private static final Logger LOGGER =
LogServiceFactory.getLogService(ExtendedBlockletWrapper.class.getName());

private static final int BUFFER_SIZE = 8 * 1024 * 1024;
private static final int BLOCK_SIZE = 256 * 1024 * 1024;
private boolean isWrittenToFile;

private int dataSize;

private byte[] bytes;

private static final int BUFFER_SIZE = 8 * 1024 * 1024;

private static final int BLOCK_SIZE = 256 * 1024 * 1024;
private long count;

public ExtendedBlockletWrapper() {

}

public ExtendedBlockletWrapper(List<ExtendedBlocklet> extendedBlockletList, String tablePath,
String queryId, boolean isWriteToFile) {
String queryId, boolean isWriteToFile, boolean isCountJob) {
Map<String, Short> uniqueLocations = new HashMap<>();
byte[] bytes = convertToBytes(tablePath, uniqueLocations, extendedBlockletList);
byte[] bytes = convertToBytes(tablePath, uniqueLocations, extendedBlockletList, isCountJob);
int serializeAllowedSize = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD,
CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_DEFAULT)) * 1024;
Expand Down Expand Up @@ -122,13 +118,13 @@ public ExtendedBlockletWrapper(List<ExtendedBlocklet> extendedBlockletList, Stri
}

private byte[] convertToBytes(String tablePath, Map<String, Short> uniqueLocations,
List<ExtendedBlocklet> extendedBlockletList) {
List<ExtendedBlocklet> extendedBlockletList, boolean isCountJob) {
ByteArrayOutputStream bos = new ExtendedByteArrayOutputStream();
DataOutputStream stream = new DataOutputStream(bos);
try {
for (ExtendedBlocklet extendedBlocklet : extendedBlockletList) {
extendedBlocklet.setFilePath(extendedBlocklet.getFilePath().replace(tablePath, ""));
extendedBlocklet.serializeData(stream, uniqueLocations);
extendedBlocklet.serializeData(stream, uniqueLocations, isCountJob);
}
return new SnappyCompressor().compressByte(bos.toByteArray());
} catch (IOException e) {
Expand All @@ -142,6 +138,7 @@ private byte[] convertToBytes(String tablePath, Map<String, Short> uniqueLocatio
* Below method will be used to write the data to stream[file/memory]
* Data Format
* <number of splits><number of unique location[short]><locations><serialize data len><data>
*
* @param stream
* @param data
* @param uniqueLocation
Expand All @@ -158,7 +155,7 @@ private void writeBlockletToStream(DataOutputStream stream, byte[] data,
final Map.Entry<String, Short> next = iterator.next();
uniqueLoc[next.getValue()] = next.getKey();
}
stream.writeShort((short)uniqueLoc.length);
stream.writeShort((short) uniqueLoc.length);
for (String loc : uniqueLoc) {
stream.writeUTF(loc);
}
Expand All @@ -170,12 +167,14 @@ private void writeBlockletToStream(DataOutputStream stream, byte[] data,
* deseralize the blocklet data from file or stream
* data format
* <number of splits><number of unique location[short]><locations><serialize data len><data>
*
* @param tablePath
* @param queryId
* @return
* @throws IOException
*/
public List<ExtendedBlocklet> readBlocklet(String tablePath, String queryId) throws IOException {
public List<ExtendedBlocklet> readBlocklet(String tablePath, String queryId, boolean isCountJob)
throws IOException {
byte[] data;
if (bytes != null) {
if (isWrittenToFile) {
Expand Down Expand Up @@ -218,7 +217,7 @@ public List<ExtendedBlocklet> readBlocklet(String tablePath, String queryId) thr
try {
for (int i = 0; i < numberOfBlocklet; i++) {
ExtendedBlocklet extendedBlocklet = new ExtendedBlocklet();
extendedBlocklet.deserializeFields(eDIS, locations, tablePath);
extendedBlocklet.deserializeFields(eDIS, locations, tablePath, isCountJob);
extendedBlockletList.add(extendedBlocklet);
}
} finally {
Expand Down Expand Up @@ -248,4 +247,8 @@ public List<ExtendedBlocklet> readBlocklet(String tablePath, String queryId) thr
}
this.dataSize = in.readInt();
}

public long getCount() {
return count;
}
}

0 comments on commit 39b00c6

Please sign in to comment.