Skip to content

Commit

Permalink
optimized index server output for count(*)
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal642 committed Jun 27, 2019
1 parent 473afdd commit f5abf81
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ static List<ExtendedBlocklet> pruneDataMaps(CarbonTable table,
return blocklets;
}



/**
* this method gets the datamapJob and call execute of that job, this will be launched for
* distributed CG or FG
Expand All @@ -229,7 +231,7 @@ public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
List<Segment> validSegments, List<Segment> invalidSegments, DataMapLevel level,
List<String> segmentsToBeRefreshed) throws IOException {
return executeDataMapJob(carbonTable, resolver, dataMapJob, partitionsToPrune, validSegments,
invalidSegments, level, false, segmentsToBeRefreshed);
invalidSegments, level, false, segmentsToBeRefreshed, false);
}

/**
Expand All @@ -240,7 +242,8 @@ public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
FilterResolverIntf resolver, DataMapJob dataMapJob, List<PartitionSpec> partitionsToPrune,
List<Segment> validSegments, List<Segment> invalidSegments, DataMapLevel level,
Boolean isFallbackJob, List<String> segmentsToBeRefreshed) throws IOException {
Boolean isFallbackJob, List<String> segmentsToBeRefreshed, boolean isCountJob)
throws IOException {
List<String> invalidSegmentNo = new ArrayList<>();
for (Segment segment : invalidSegments) {
invalidSegmentNo.add(segment.getSegmentNo());
Expand All @@ -249,9 +252,11 @@ public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
DistributableDataMapFormat dataMapFormat =
new DistributableDataMapFormat(carbonTable, resolver, validSegments, invalidSegmentNo,
partitionsToPrune, false, level, isFallbackJob);
List<ExtendedBlocklet> prunedBlocklets = dataMapJob.execute(dataMapFormat);
// Apply expression on the blocklets.
return prunedBlocklets;
if (isCountJob) {
dataMapFormat.setCountStarJob();
dataMapFormat.setIsWriteToFile(false);
}
return dataMapJob.execute(dataMapFormat);
}

public static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
private String queryId = "";

private boolean isWriteToFile = true;

private boolean isCountStarJob = false;

DistributableDataMapFormat() {

}
Expand Down Expand Up @@ -399,4 +402,12 @@ public boolean isWriteToFile() {
public void setFallbackJob() {
isFallbackJob = true;
}

public void setCountStarJob() {
this.isCountStarJob = true;
}

public boolean isCountStarJob() {
return this.isCountStarJob;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public class ExtendedBlocklet extends Blocklet {

private CarbonInputSplit inputSplit;

private Long count;

private String segmentNo;

public ExtendedBlocklet() {

}
Expand Down Expand Up @@ -78,6 +82,9 @@ public long getLength() {
}

public String getSegmentId() {
if (segmentNo != null) {
return segmentNo;
}
return this.inputSplit.getSegmentId();
}

Expand All @@ -96,6 +103,14 @@ public String getDataMapWriterPath() {
return this.inputSplit.getDataMapWritePath();
}

public Long getRowCount() {
if (count != null) {
return count;
} else {
return (long) inputSplit.getRowCount();
}
}

public void setDataMapWriterPath(String dataMapWriterPath) {
this.inputSplit.setDataMapWritePath(dataMapWriterPath);
}
Expand Down Expand Up @@ -161,30 +176,35 @@ public void setColumnSchema(List<ColumnSchema> columnSchema) {
* @param uniqueLocation
* @throws IOException
*/
public void serializeData(DataOutput out, Map<String, Short> uniqueLocation)
public void serializeData(DataOutput out, Map<String, Short> uniqueLocation, boolean isCountJob)
throws IOException {
super.write(out);
if (dataMapUniqueId == null) {
out.writeBoolean(false);
if (isCountJob) {
out.writeLong(inputSplit.getRowCount());
out.writeUTF(inputSplit.getSegmentId());
} else {
out.writeBoolean(true);
out.writeUTF(dataMapUniqueId);
}
out.writeBoolean(inputSplit != null);
if (inputSplit != null) {
// creating byte array output stream to get the size of input split serializeData size
ExtendedByteArrayOutputStream ebos = new ExtendedByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(ebos);
inputSplit.setFilePath(null);
inputSplit.setBucketId(null);
if (inputSplit.isBlockCache()) {
inputSplit.updateFooteroffset();
inputSplit.updateBlockLength();
inputSplit.setWriteDetailInfo(false);
if (dataMapUniqueId == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(dataMapUniqueId);
}
out.writeBoolean(inputSplit != null);
if (inputSplit != null) {
// creating byte array output stream to get the size of input split serializeData size
ExtendedByteArrayOutputStream ebos = new ExtendedByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(ebos);
inputSplit.setFilePath(null);
inputSplit.setBucketId(null);
if (inputSplit.isBlockCache()) {
inputSplit.updateFooteroffset();
inputSplit.updateBlockLength();
inputSplit.setWriteDetailInfo(false);
}
inputSplit.serializeFields(dos, uniqueLocation);
out.writeInt(ebos.size());
out.write(ebos.getBuffer(), 0, ebos.size());
}
inputSplit.serializeFields(dos, uniqueLocation);
out.writeInt(ebos.size());
out.write(ebos.getBuffer(), 0 , ebos.size());
}
}

Expand All @@ -195,9 +215,15 @@ public void serializeData(DataOutput out, Map<String, Short> uniqueLocation)
* @param tablePath
* @throws IOException
*/
public void deserializeFields(DataInput in, String[] locations, String tablePath)
public void deserializeFields(DataInput in, String[] locations, String tablePath,
boolean isCountJob)
throws IOException {
super.readFields(in);
if (isCountJob) {
count = in.readLong();
segmentNo = in.readUTF();
return;
}
if (in.readBoolean()) {
dataMapUniqueId = in.readUTF();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,20 @@ public class ExtendedBlockletWrapper implements Writable, Serializable {

private static final Logger LOGGER =
LogServiceFactory.getLogService(ExtendedBlockletWrapper.class.getName());

private static final int BUFFER_SIZE = 8 * 1024 * 1024;
private static final int BLOCK_SIZE = 256 * 1024 * 1024;
private boolean isWrittenToFile;

private int dataSize;

private byte[] bytes;

private static final int BUFFER_SIZE = 8 * 1024 * 1024;

private static final int BLOCK_SIZE = 256 * 1024 * 1024;

public ExtendedBlockletWrapper() {

}

public ExtendedBlockletWrapper(List<ExtendedBlocklet> extendedBlockletList, String tablePath,
String queryId, boolean isWriteToFile) {
String queryId, boolean isWriteToFile, boolean isCountJob) {
Map<String, Short> uniqueLocations = new HashMap<>();
byte[] bytes = convertToBytes(tablePath, uniqueLocations, extendedBlockletList);
byte[] bytes = convertToBytes(tablePath, uniqueLocations, extendedBlockletList, isCountJob);
int serializeAllowedSize = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD,
CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_DEFAULT)) * 1024;
Expand All @@ -92,8 +87,8 @@ public ExtendedBlockletWrapper(List<ExtendedBlocklet> extendedBlockletList, Stri
isFolderExists = false;
}
if (isFolderExists) {
stream = FileFactory.getDataOutputStream(folderPath + "/" + fileName,
FileFactory.getFileType(folderPath),
stream = FileFactory
.getDataOutputStream(folderPath + "/" + fileName, FileFactory.getFileType(folderPath),
BUFFER_SIZE, BLOCK_SIZE, (short) 1);
writeBlockletToStream(stream, bytes, uniqueLocations, extendedBlockletList);
this.dataSize = stream.size();
Expand Down Expand Up @@ -122,13 +117,13 @@ public ExtendedBlockletWrapper(List<ExtendedBlocklet> extendedBlockletList, Stri
}

private byte[] convertToBytes(String tablePath, Map<String, Short> uniqueLocations,
List<ExtendedBlocklet> extendedBlockletList) {
List<ExtendedBlocklet> extendedBlockletList, boolean isCountJob) {
ByteArrayOutputStream bos = new ExtendedByteArrayOutputStream();
DataOutputStream stream = new DataOutputStream(bos);
try {
for (ExtendedBlocklet extendedBlocklet : extendedBlockletList) {
extendedBlocklet.setFilePath(extendedBlocklet.getFilePath().replace(tablePath, ""));
extendedBlocklet.serializeData(stream, uniqueLocations);
extendedBlocklet.serializeData(stream, uniqueLocations, isCountJob);
}
return new SnappyCompressor().compressByte(bos.toByteArray());
} catch (IOException e) {
Expand All @@ -142,6 +137,7 @@ private byte[] convertToBytes(String tablePath, Map<String, Short> uniqueLocatio
* Below method will be used to write the data to stream[file/memory]
* Data Format
* <number of splits><number of unique location[short]><locations><serialize data len><data>
*
* @param stream
* @param data
* @param uniqueLocation
Expand All @@ -158,7 +154,7 @@ private void writeBlockletToStream(DataOutputStream stream, byte[] data,
final Map.Entry<String, Short> next = iterator.next();
uniqueLoc[next.getValue()] = next.getKey();
}
stream.writeShort((short)uniqueLoc.length);
stream.writeShort((short) uniqueLoc.length);
for (String loc : uniqueLoc) {
stream.writeUTF(loc);
}
Expand All @@ -170,12 +166,14 @@ private void writeBlockletToStream(DataOutputStream stream, byte[] data,
* deseralize the blocklet data from file or stream
* data format
* <number of splits><number of unique location[short]><locations><serialize data len><data>
*
* @param tablePath
* @param queryId
* @return
* @throws IOException
*/
public List<ExtendedBlocklet> readBlocklet(String tablePath, String queryId) throws IOException {
public List<ExtendedBlocklet> readBlocklet(String tablePath, String queryId, boolean isCountJob)
throws IOException {
byte[] data;
if (bytes != null) {
if (isWrittenToFile) {
Expand Down Expand Up @@ -211,14 +209,14 @@ public List<ExtendedBlocklet> readBlocklet(String tablePath, String queryId) thr
}

final byte[] unCompressByte =
new SnappyCompressor().unCompressByte(data, data.length - actualDataLen, actualDataLen);
new SnappyCompressor().unCompressByte(data, this.dataSize - actualDataLen, actualDataLen);
ExtendedByteArrayInputStream ebis = new ExtendedByteArrayInputStream(unCompressByte);
ExtendedDataInputStream eDIS = new ExtendedDataInputStream(ebis);
List<ExtendedBlocklet> extendedBlockletList = new ArrayList<>();
try {
for (int i = 0; i < numberOfBlocklet; i++) {
ExtendedBlocklet extendedBlocklet = new ExtendedBlocklet();
extendedBlocklet.deserializeFields(eDIS, locations, tablePath);
extendedBlocklet.deserializeFields(eDIS, locations, tablePath, isCountJob);
extendedBlockletList.add(extendedBlocklet);
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public ExtendedBlockletWrapperContainer(ExtendedBlockletWrapper[] extendedBlockl
this.isFallbackJob = isFallbackJob;
}

public List<ExtendedBlocklet> getExtendedBlockets(String tablePath, String queryId)
throws IOException {
public List<ExtendedBlocklet> getExtendedBlockets(String tablePath, String queryId,
boolean isCountJob) throws IOException {
if (!isFallbackJob) {
int numOfThreads = CarbonProperties.getNumOfThreadsForPruning();
ExecutorService executorService = Executors
Expand All @@ -85,8 +85,8 @@ public List<ExtendedBlocklet> getExtendedBlockets(String tablePath, String query
List<Future<List<ExtendedBlocklet>>> futures = new ArrayList<>();
for (int i = 0; i < split.length; i++) {
end += split[i];
futures.add(executorService
.submit(new ExtendedBlockletDeserializerThread(start, end, tablePath, queryId)));
futures.add(executorService.submit(
new ExtendedBlockletDeserializerThread(start, end, tablePath, queryId, isCountJob)));
start += split[i];
}
executorService.shutdown();
Expand All @@ -109,7 +109,8 @@ public List<ExtendedBlocklet> getExtendedBlockets(String tablePath, String query
} else {
List<ExtendedBlocklet> extendedBlocklets = new ArrayList<>();
for (ExtendedBlockletWrapper extendedBlockletWrapper: extendedBlockletWrappers) {
extendedBlocklets.addAll(extendedBlockletWrapper.readBlocklet(tablePath, queryId));
extendedBlocklets
.addAll(extendedBlockletWrapper.readBlocklet(tablePath, queryId, isCountJob));
}
return extendedBlocklets;
}
Expand All @@ -125,18 +126,22 @@ private class ExtendedBlockletDeserializerThread implements Callable<List<Extend

private String queryId;

private boolean isCountJob;

public ExtendedBlockletDeserializerThread(int start, int end, String tablePath,
String queryId) {
String queryId, boolean isCountJob) {
this.start = start;
this.end = end;
this.tablePath = tablePath;
this.queryId = queryId;
this.isCountJob = isCountJob;
}

@Override public List<ExtendedBlocklet> call() throws Exception {
List<ExtendedBlocklet> extendedBlocklets = new ArrayList<>();
for (int i = start; i < end; i++) {
extendedBlocklets.addAll(extendedBlockletWrappers[i].readBlocklet(tablePath, queryId));
extendedBlocklets.addAll(extendedBlockletWrappers[i].readBlocklet(tablePath, queryId,
isCountJob));
}
return extendedBlocklets;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,10 +412,18 @@ public static ReadCommittedScope getReadCommittedScope(Configuration configurati
*/
@Override public abstract List<InputSplit> getSplits(JobContext job) throws IOException;

List<ExtendedBlocklet> getDistributedSplit(CarbonTable table,
private List<ExtendedBlocklet> getDistributedSplit(CarbonTable table,
FilterResolverIntf filterResolverIntf, List<PartitionSpec> partitionNames,
List<Segment> validSegments, List<Segment> invalidSegments,
List<String> segmentsToBeRefreshed) throws IOException {
return getDistributedSplit(table, filterResolverIntf, partitionNames, validSegments,
invalidSegments, segmentsToBeRefreshed, false);
}

List<ExtendedBlocklet> getDistributedSplit(CarbonTable table,
FilterResolverIntf filterResolverIntf, List<PartitionSpec> partitionNames,
List<Segment> validSegments, List<Segment> invalidSegments,
List<String> segmentsToBeRefreshed, boolean isCountJob) throws IOException {
try {
DataMapJob dataMapJob =
(DataMapJob) DataMapUtil.createDataMapJob(DataMapUtil.DISTRIBUTED_JOB_NAME);
Expand All @@ -424,18 +432,17 @@ List<ExtendedBlocklet> getDistributedSplit(CarbonTable table,
}
return DataMapUtil
.executeDataMapJob(table, filterResolverIntf, dataMapJob, partitionNames, validSegments,
invalidSegments, null, segmentsToBeRefreshed);
invalidSegments, null, false, segmentsToBeRefreshed, isCountJob);
} catch (Exception e) {
// Check if fallback is disabled for testing purposes then directly throw exception.
if (CarbonProperties.getInstance().isFallBackDisabled()) {
throw e;
}
LOG.error("Exception occurred while getting splits using index server. Initiating Fall "
+ "back to embedded mode", e);
return DataMapUtil
.executeDataMapJob(table, filterResolverIntf,
DataMapUtil.getEmbeddedJob(), partitionNames, validSegments, invalidSegments, null,
true, segmentsToBeRefreshed);
return DataMapUtil.executeDataMapJob(table, filterResolverIntf,
DataMapUtil.getEmbeddedJob(), partitionNames, validSegments,
invalidSegments, null, true, segmentsToBeRefreshed, isCountJob);
}
}

Expand Down
Loading

0 comments on commit f5abf81

Please sign in to comment.