Skip to content

Commit

Permalink
Merge c773e1b into c65cc12
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal642 committed Jul 23, 2019
2 parents c65cc12 + c773e1b commit bf13834
Show file tree
Hide file tree
Showing 13 changed files with 315 additions and 81 deletions.
Expand Up @@ -30,4 +30,8 @@ public abstract class AbstractDataMapJob implements DataMapJob {
@Override public void execute(CarbonTable carbonTable,
FileInputFormat<Void, BlockletDataMapIndexWrapper> format) {
}

@Override public Long executeCountJob(DistributableDataMapFormat dataMapFormat) {
return null;
}
}
Expand Up @@ -35,4 +35,6 @@ public interface DataMapJob extends Serializable {

List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat);

Long executeCountJob(DistributableDataMapFormat dataMapFormat);

}
Expand Up @@ -230,7 +230,7 @@ public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
List<Segment> validSegments, List<Segment> invalidSegments, DataMapLevel level,
List<String> segmentsToBeRefreshed) throws IOException {
return executeDataMapJob(carbonTable, resolver, dataMapJob, partitionsToPrune, validSegments,
invalidSegments, level, false, segmentsToBeRefreshed);
invalidSegments, level, false, segmentsToBeRefreshed, false);
}

/**
Expand All @@ -241,7 +241,8 @@ public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
FilterResolverIntf resolver, DataMapJob dataMapJob, List<PartitionSpec> partitionsToPrune,
List<Segment> validSegments, List<Segment> invalidSegments, DataMapLevel level,
Boolean isFallbackJob, List<String> segmentsToBeRefreshed) throws IOException {
Boolean isFallbackJob, List<String> segmentsToBeRefreshed, boolean isCountJob)
throws IOException {
List<String> invalidSegmentNo = new ArrayList<>();
for (Segment segment : invalidSegments) {
invalidSegmentNo.add(segment.getSegmentNo());
Expand All @@ -250,9 +251,11 @@ public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
DistributableDataMapFormat dataMapFormat =
new DistributableDataMapFormat(carbonTable, resolver, validSegments, invalidSegmentNo,
partitionsToPrune, false, level, isFallbackJob);
List<ExtendedBlocklet> prunedBlocklets = dataMapJob.execute(dataMapFormat);
// Apply expression on the blocklets.
return prunedBlocklets;
if (isCountJob) {
dataMapFormat.setCountStarJob();
dataMapFormat.setIsWriteToFile(false);
}
return dataMapJob.execute(dataMapFormat);
}

public static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
Expand Down
Expand Up @@ -28,7 +28,6 @@

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
Expand Down Expand Up @@ -91,6 +90,8 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl

private boolean isWriteToFile = true;

private boolean isCountStarJob = false;

DistributableDataMapFormat() {

}
Expand All @@ -103,7 +104,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
this.dataMapToClear = dataMapToClear;
}

DistributableDataMapFormat(CarbonTable table, FilterResolverIntf filterResolverIntf,
public DistributableDataMapFormat(CarbonTable table, FilterResolverIntf filterResolverIntf,
List<Segment> validSegments, List<String> invalidSegments, List<PartitionSpec> partitions,
boolean isJobToClearDataMaps, DataMapLevel dataMapLevel, boolean isFallbackJob)
throws IOException {
Expand Down Expand Up @@ -136,7 +137,6 @@ public RecordReader<Void, ExtendedBlocklet> createRecordReader(InputSplit inputS
return new RecordReader<Void, ExtendedBlocklet>() {
private Iterator<ExtendedBlocklet> blockletIterator;
private ExtendedBlocklet currBlocklet;
private List<DataMap> dataMaps;

@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
Expand All @@ -149,7 +149,6 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
if (dataMapLevel == null) {
TableDataMap defaultDataMap = DataMapStoreManager.getInstance()
.getDataMap(table, distributable.getDistributable().getDataMapSchema());
dataMaps = defaultDataMap.getTableDataMaps(distributable.getDistributable());
blocklets = defaultDataMap
.prune(segmentsToLoad, new DataMapFilter(filterResolverIntf), partitions);
blocklets = DataMapUtil
Expand Down Expand Up @@ -192,11 +191,6 @@ public float getProgress() throws IOException, InterruptedException {

@Override
public void close() throws IOException {
if (null != dataMaps) {
for (DataMap dataMap : dataMaps) {
dataMap.finish();
}
}
}
};
}
Expand Down Expand Up @@ -247,6 +241,7 @@ public void write(DataOutput out) throws IOException {
out.writeUTF(taskGroupDesc);
out.writeUTF(queryId);
out.writeBoolean(isWriteToFile);
out.writeBoolean(isCountStarJob);
}

@Override
Expand Down Expand Up @@ -292,6 +287,7 @@ public void readFields(DataInput in) throws IOException {
this.taskGroupDesc = in.readUTF();
this.queryId = in.readUTF();
this.isWriteToFile = in.readBoolean();
this.isCountStarJob = in.readBoolean();
}

private void initReadCommittedScope() throws IOException {
Expand Down Expand Up @@ -398,9 +394,29 @@ public List<String> getValidSegmentIds() {
return validSegments;
}

public List<Segment> getValidSegments() {
return validSegments;
}

public void createDataMapChooser() throws IOException {
if (null != filterResolverIntf) {
this.dataMapChooser = new DataMapChooser(table);
}
}

public void setCountStarJob() {
this.isCountStarJob = true;
}

public boolean isCountStarJob() {
return this.isCountStarJob;
}

public List<PartitionSpec> getPartitions() {
return partitions;
}

public ReadCommittedScope getReadCommittedScope() {
return readCommittedScope;
}
}
Expand Up @@ -39,6 +39,10 @@ public class ExtendedBlocklet extends Blocklet {

private CarbonInputSplit inputSplit;

private Long count;

private String segmentNo;

public ExtendedBlocklet() {

}
Expand Down Expand Up @@ -78,6 +82,9 @@ public long getLength() {
}

public String getSegmentId() {
if (segmentNo != null) {
return segmentNo;
}
return this.inputSplit.getSegmentId();
}

Expand All @@ -92,8 +99,12 @@ public String getPath() {
return getFilePath();
}

public String getDataMapWriterPath() {
return this.inputSplit.getDataMapWritePath();
public Long getRowCount() {
if (count != null) {
return count;
} else {
return (long) inputSplit.getRowCount();
}
}

public void setDataMapWriterPath(String dataMapWriterPath) {
Expand Down Expand Up @@ -161,30 +172,35 @@ public void setColumnSchema(List<ColumnSchema> columnSchema) {
* @param uniqueLocation
* @throws IOException
*/
public void serializeData(DataOutput out, Map<String, Short> uniqueLocation)
public void serializeData(DataOutput out, Map<String, Short> uniqueLocation, boolean isCountJob)
throws IOException {
super.write(out);
if (dataMapUniqueId == null) {
out.writeBoolean(false);
if (isCountJob) {
out.writeLong(inputSplit.getRowCount());
out.writeUTF(inputSplit.getSegmentId());
} else {
out.writeBoolean(true);
out.writeUTF(dataMapUniqueId);
}
out.writeBoolean(inputSplit != null);
if (inputSplit != null) {
// creating byte array output stream to get the size of input split serializeData size
ExtendedByteArrayOutputStream ebos = new ExtendedByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(ebos);
inputSplit.setFilePath(null);
inputSplit.setBucketId(null);
if (inputSplit.isBlockCache()) {
inputSplit.updateFooteroffset();
inputSplit.updateBlockLength();
inputSplit.setWriteDetailInfo(false);
if (dataMapUniqueId == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(dataMapUniqueId);
}
out.writeBoolean(inputSplit != null);
if (inputSplit != null) {
// creating byte array output stream to get the size of input split serializeData size
ExtendedByteArrayOutputStream ebos = new ExtendedByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(ebos);
inputSplit.setFilePath(null);
inputSplit.setBucketId(null);
if (inputSplit.isBlockCache()) {
inputSplit.updateFooteroffset();
inputSplit.updateBlockLength();
inputSplit.setWriteDetailInfo(false);
}
inputSplit.serializeFields(dos, uniqueLocation);
out.writeInt(ebos.size());
out.write(ebos.getBuffer(), 0, ebos.size());
}
inputSplit.serializeFields(dos, uniqueLocation);
out.writeInt(ebos.size());
out.write(ebos.getBuffer(), 0 , ebos.size());
}
}

Expand All @@ -195,9 +211,15 @@ public void serializeData(DataOutput out, Map<String, Short> uniqueLocation)
* @param tablePath
* @throws IOException
*/
public void deserializeFields(DataInput in, String[] locations, String tablePath)
public void deserializeFields(DataInput in, String[] locations, String tablePath,
boolean isCountJob)
throws IOException {
super.readFields(in);
if (isCountJob) {
count = in.readLong();
segmentNo = in.readUTF();
return;
}
if (in.readBoolean()) {
dataMapUniqueId = in.readUTF();
}
Expand Down
Expand Up @@ -56,25 +56,21 @@ public class ExtendedBlockletWrapper implements Writable, Serializable {

private static final Logger LOGGER =
LogServiceFactory.getLogService(ExtendedBlockletWrapper.class.getName());

private static final int BUFFER_SIZE = 8 * 1024 * 1024;
private static final int BLOCK_SIZE = 256 * 1024 * 1024;
private boolean isWrittenToFile;

private int dataSize;

private byte[] bytes;

private static final int BUFFER_SIZE = 8 * 1024 * 1024;

private static final int BLOCK_SIZE = 256 * 1024 * 1024;
private long count;

public ExtendedBlockletWrapper() {

}

public ExtendedBlockletWrapper(List<ExtendedBlocklet> extendedBlockletList, String tablePath,
String queryId, boolean isWriteToFile) {
String queryId, boolean isWriteToFile, boolean isCountJob) {
Map<String, Short> uniqueLocations = new HashMap<>();
byte[] bytes = convertToBytes(tablePath, uniqueLocations, extendedBlockletList);
byte[] bytes = convertToBytes(tablePath, uniqueLocations, extendedBlockletList, isCountJob);
int serializeAllowedSize = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD,
CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_DEFAULT)) * 1024;
Expand Down Expand Up @@ -122,13 +118,13 @@ public ExtendedBlockletWrapper(List<ExtendedBlocklet> extendedBlockletList, Stri
}

private byte[] convertToBytes(String tablePath, Map<String, Short> uniqueLocations,
List<ExtendedBlocklet> extendedBlockletList) {
List<ExtendedBlocklet> extendedBlockletList, boolean isCountJob) {
ByteArrayOutputStream bos = new ExtendedByteArrayOutputStream();
DataOutputStream stream = new DataOutputStream(bos);
try {
for (ExtendedBlocklet extendedBlocklet : extendedBlockletList) {
extendedBlocklet.setFilePath(extendedBlocklet.getFilePath().replace(tablePath, ""));
extendedBlocklet.serializeData(stream, uniqueLocations);
extendedBlocklet.serializeData(stream, uniqueLocations, isCountJob);
}
return new SnappyCompressor().compressByte(bos.toByteArray());
} catch (IOException e) {
Expand All @@ -142,6 +138,7 @@ private byte[] convertToBytes(String tablePath, Map<String, Short> uniqueLocatio
* Below method will be used to write the data to stream[file/memory]
* Data Format
* <number of splits><number of unique location[short]><locations><serialize data len><data>
*
* @param stream
* @param data
* @param uniqueLocation
Expand All @@ -158,7 +155,7 @@ private void writeBlockletToStream(DataOutputStream stream, byte[] data,
final Map.Entry<String, Short> next = iterator.next();
uniqueLoc[next.getValue()] = next.getKey();
}
stream.writeShort((short)uniqueLoc.length);
stream.writeShort((short) uniqueLoc.length);
for (String loc : uniqueLoc) {
stream.writeUTF(loc);
}
Expand All @@ -170,12 +167,14 @@ private void writeBlockletToStream(DataOutputStream stream, byte[] data,
* deseralize the blocklet data from file or stream
* data format
* <number of splits><number of unique location[short]><locations><serialize data len><data>
*
* @param tablePath
* @param queryId
* @return
* @throws IOException
*/
public List<ExtendedBlocklet> readBlocklet(String tablePath, String queryId) throws IOException {
public List<ExtendedBlocklet> readBlocklet(String tablePath, String queryId, boolean isCountJob)
throws IOException {
byte[] data;
if (bytes != null) {
if (isWrittenToFile) {
Expand Down Expand Up @@ -218,7 +217,7 @@ public List<ExtendedBlocklet> readBlocklet(String tablePath, String queryId) thr
try {
for (int i = 0; i < numberOfBlocklet; i++) {
ExtendedBlocklet extendedBlocklet = new ExtendedBlocklet();
extendedBlocklet.deserializeFields(eDIS, locations, tablePath);
extendedBlocklet.deserializeFields(eDIS, locations, tablePath, isCountJob);
extendedBlockletList.add(extendedBlocklet);
}
} finally {
Expand Down Expand Up @@ -248,4 +247,8 @@ public List<ExtendedBlocklet> readBlocklet(String tablePath, String queryId) thr
}
this.dataSize = in.readInt();
}

public long getCount() {
return count;
}
}

0 comments on commit bf13834

Please sign in to comment.