diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/AbstractDataMapJob.java b/core/src/main/java/org/apache/carbondata/core/datamap/AbstractDataMapJob.java index ed3ecc9c343..52b187cecfb 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/AbstractDataMapJob.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/AbstractDataMapJob.java @@ -30,4 +30,8 @@ public abstract class AbstractDataMapJob implements DataMapJob { @Override public void execute(CarbonTable carbonTable, FileInputFormat format) { } + + @Override public Long executeCountJob(DistributableDataMapFormat dataMapFormat) { + return null; + } } diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java index 9eafe7c635d..326282dc2ab 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java @@ -35,4 +35,6 @@ public interface DataMapJob extends Serializable { List execute(DistributableDataMapFormat dataMapFormat); + Long executeCountJob(DistributableDataMapFormat dataMapFormat); + } diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java index dd9debc1bc9..bca7409ffb0 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java @@ -230,7 +230,7 @@ public static List executeDataMapJob(CarbonTable carbonTable, List validSegments, List invalidSegments, DataMapLevel level, List segmentsToBeRefreshed) throws IOException { return executeDataMapJob(carbonTable, resolver, dataMapJob, partitionsToPrune, validSegments, - invalidSegments, level, false, segmentsToBeRefreshed); + invalidSegments, level, false, segmentsToBeRefreshed, false); } /** @@ -241,7 +241,8 @@ public static List executeDataMapJob(CarbonTable carbonTable, public static List executeDataMapJob(CarbonTable carbonTable, FilterResolverIntf resolver, DataMapJob dataMapJob, List partitionsToPrune, List validSegments, List invalidSegments, DataMapLevel level, - Boolean isFallbackJob, List segmentsToBeRefreshed) throws IOException { + Boolean isFallbackJob, List segmentsToBeRefreshed, boolean isCountJob) + throws IOException { List invalidSegmentNo = new ArrayList<>(); for (Segment segment : invalidSegments) { invalidSegmentNo.add(segment.getSegmentNo()); @@ -250,9 +251,11 @@ public static List executeDataMapJob(CarbonTable carbonTable, DistributableDataMapFormat dataMapFormat = new DistributableDataMapFormat(carbonTable, resolver, validSegments, invalidSegmentNo, partitionsToPrune, false, level, isFallbackJob); - List prunedBlocklets = dataMapJob.execute(dataMapFormat); - // Apply expression on the blocklets. - return prunedBlocklets; + if (isCountJob) { + dataMapFormat.setCountStarJob(); + dataMapFormat.setIsWriteToFile(false); + } + return dataMapJob.execute(dataMapFormat); } public static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvalidSegments( diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java index 8426fcb9344..b430c5d4651 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java @@ -28,7 +28,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; @@ -91,6 +90,8 @@ public class DistributableDataMapFormat extends FileInputFormat validSegments, List invalidSegments, List partitions, boolean isJobToClearDataMaps, DataMapLevel dataMapLevel, boolean isFallbackJob) throws IOException { @@ -136,7 +137,6 @@ public RecordReader createRecordReader(InputSplit inputS return new RecordReader() { private Iterator blockletIterator; private ExtendedBlocklet currBlocklet; - private List dataMaps; @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) @@ -149,7 +149,6 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont if (dataMapLevel == null) { TableDataMap defaultDataMap = DataMapStoreManager.getInstance() .getDataMap(table, distributable.getDistributable().getDataMapSchema()); - dataMaps = defaultDataMap.getTableDataMaps(distributable.getDistributable()); blocklets = defaultDataMap .prune(segmentsToLoad, new DataMapFilter(filterResolverIntf), partitions); blocklets = DataMapUtil @@ -192,11 +191,6 @@ public float getProgress() throws IOException, InterruptedException { @Override public void close() throws IOException { - if (null != dataMaps) { - for (DataMap dataMap : dataMaps) { - dataMap.finish(); - } - } } }; } @@ -247,6 +241,7 @@ public void write(DataOutput out) throws IOException { out.writeUTF(taskGroupDesc); out.writeUTF(queryId); out.writeBoolean(isWriteToFile); + out.writeBoolean(isCountStarJob); } @Override @@ -292,6 +287,7 @@ public void readFields(DataInput in) throws IOException { this.taskGroupDesc = in.readUTF(); this.queryId = in.readUTF(); this.isWriteToFile = in.readBoolean(); + this.isCountStarJob = in.readBoolean(); } private void initReadCommittedScope() throws IOException { @@ -398,9 +394,29 @@ public List getValidSegmentIds() { return validSegments; } + public List getValidSegments() { + return validSegments; + } + public void createDataMapChooser() throws IOException { if (null != filterResolverIntf) { this.dataMapChooser = new DataMapChooser(table); } } + + public void setCountStarJob() { + this.isCountStarJob = true; + } + + public boolean isCountStarJob() { + return this.isCountStarJob; + } + + public List getPartitions() { + return partitions; + } + + public ReadCommittedScope getReadCommittedScope() { + return readCommittedScope; + } } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java index a85423babed..611e9697e06 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java @@ -39,6 +39,10 @@ public class ExtendedBlocklet extends Blocklet { private CarbonInputSplit inputSplit; + private Long count; + + private String segmentNo; + public ExtendedBlocklet() { } @@ -78,6 +82,9 @@ public long getLength() { } public String getSegmentId() { + if (segmentNo != null) { + return segmentNo; + } return this.inputSplit.getSegmentId(); } @@ -92,8 +99,12 @@ public String getPath() { return getFilePath(); } - public String getDataMapWriterPath() { - return this.inputSplit.getDataMapWritePath(); + public Long getRowCount() { + if (count != null) { + return count; + } else { + return (long) inputSplit.getRowCount(); + } } public void setDataMapWriterPath(String dataMapWriterPath) { @@ -161,30 +172,35 @@ public void setColumnSchema(List columnSchema) { * @param uniqueLocation * @throws IOException */ - public void serializeData(DataOutput out, Map uniqueLocation) + public void serializeData(DataOutput out, Map uniqueLocation, boolean isCountJob) throws IOException { super.write(out); - if (dataMapUniqueId == null) { - out.writeBoolean(false); + if (isCountJob) { + out.writeLong(inputSplit.getRowCount()); + out.writeUTF(inputSplit.getSegmentId()); } else { - out.writeBoolean(true); - out.writeUTF(dataMapUniqueId); - } - out.writeBoolean(inputSplit != null); - if (inputSplit != null) { - // creating byte array output stream to get the size of input split serializeData size - ExtendedByteArrayOutputStream ebos = new ExtendedByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(ebos); - inputSplit.setFilePath(null); - inputSplit.setBucketId(null); - if (inputSplit.isBlockCache()) { - inputSplit.updateFooteroffset(); - inputSplit.updateBlockLength(); - inputSplit.setWriteDetailInfo(false); + if (dataMapUniqueId == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeUTF(dataMapUniqueId); + } + out.writeBoolean(inputSplit != null); + if (inputSplit != null) { + // creating byte array output stream to get the size of input split serializeData size + ExtendedByteArrayOutputStream ebos = new ExtendedByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(ebos); + inputSplit.setFilePath(null); + inputSplit.setBucketId(null); + if (inputSplit.isBlockCache()) { + inputSplit.updateFooteroffset(); + inputSplit.updateBlockLength(); + inputSplit.setWriteDetailInfo(false); + } + inputSplit.serializeFields(dos, uniqueLocation); + out.writeInt(ebos.size()); + out.write(ebos.getBuffer(), 0, ebos.size()); } - inputSplit.serializeFields(dos, uniqueLocation); - out.writeInt(ebos.size()); - out.write(ebos.getBuffer(), 0 , ebos.size()); } } @@ -195,9 +211,15 @@ public void serializeData(DataOutput out, Map uniqueLocation) * @param tablePath * @throws IOException */ - public void deserializeFields(DataInput in, String[] locations, String tablePath) + public void deserializeFields(DataInput in, String[] locations, String tablePath, + boolean isCountJob) throws IOException { super.readFields(in); + if (isCountJob) { + count = in.readLong(); + segmentNo = in.readUTF(); + return; + } if (in.readBoolean()) { dataMapUniqueId = in.readUTF(); } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java index ab051ea35a4..eab6afbd86b 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java @@ -56,25 +56,21 @@ public class ExtendedBlockletWrapper implements Writable, Serializable { private static final Logger LOGGER = LogServiceFactory.getLogService(ExtendedBlockletWrapper.class.getName()); - + private static final int BUFFER_SIZE = 8 * 1024 * 1024; + private static final int BLOCK_SIZE = 256 * 1024 * 1024; private boolean isWrittenToFile; - private int dataSize; - private byte[] bytes; - - private static final int BUFFER_SIZE = 8 * 1024 * 1024; - - private static final int BLOCK_SIZE = 256 * 1024 * 1024; + private long count; public ExtendedBlockletWrapper() { } public ExtendedBlockletWrapper(List extendedBlockletList, String tablePath, - String queryId, boolean isWriteToFile) { + String queryId, boolean isWriteToFile, boolean isCountJob) { Map uniqueLocations = new HashMap<>(); - byte[] bytes = convertToBytes(tablePath, uniqueLocations, extendedBlockletList); + byte[] bytes = convertToBytes(tablePath, uniqueLocations, extendedBlockletList, isCountJob); int serializeAllowedSize = Integer.parseInt(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD, CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_DEFAULT)) * 1024; @@ -122,13 +118,13 @@ public ExtendedBlockletWrapper(List extendedBlockletList, Stri } private byte[] convertToBytes(String tablePath, Map uniqueLocations, - List extendedBlockletList) { + List extendedBlockletList, boolean isCountJob) { ByteArrayOutputStream bos = new ExtendedByteArrayOutputStream(); DataOutputStream stream = new DataOutputStream(bos); try { for (ExtendedBlocklet extendedBlocklet : extendedBlockletList) { extendedBlocklet.setFilePath(extendedBlocklet.getFilePath().replace(tablePath, "")); - extendedBlocklet.serializeData(stream, uniqueLocations); + extendedBlocklet.serializeData(stream, uniqueLocations, isCountJob); } return new SnappyCompressor().compressByte(bos.toByteArray()); } catch (IOException e) { @@ -142,6 +138,7 @@ private byte[] convertToBytes(String tablePath, Map uniqueLocatio * Below method will be used to write the data to stream[file/memory] * Data Format * + * * @param stream * @param data * @param uniqueLocation @@ -158,7 +155,7 @@ private void writeBlockletToStream(DataOutputStream stream, byte[] data, final Map.Entry next = iterator.next(); uniqueLoc[next.getValue()] = next.getKey(); } - stream.writeShort((short)uniqueLoc.length); + stream.writeShort((short) uniqueLoc.length); for (String loc : uniqueLoc) { stream.writeUTF(loc); } @@ -170,12 +167,14 @@ private void writeBlockletToStream(DataOutputStream stream, byte[] data, * deseralize the blocklet data from file or stream * data format * + * * @param tablePath * @param queryId * @return * @throws IOException */ - public List readBlocklet(String tablePath, String queryId) throws IOException { + public List readBlocklet(String tablePath, String queryId, boolean isCountJob) + throws IOException { byte[] data; if (bytes != null) { if (isWrittenToFile) { @@ -218,7 +217,7 @@ public List readBlocklet(String tablePath, String queryId) thr try { for (int i = 0; i < numberOfBlocklet; i++) { ExtendedBlocklet extendedBlocklet = new ExtendedBlocklet(); - extendedBlocklet.deserializeFields(eDIS, locations, tablePath); + extendedBlocklet.deserializeFields(eDIS, locations, tablePath, isCountJob); extendedBlockletList.add(extendedBlocklet); } } finally { @@ -248,4 +247,8 @@ public List readBlocklet(String tablePath, String queryId) thr } this.dataSize = in.readInt(); } + + public long getCount() { + return count; + } } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java index 0c522971fd1..40acf9e0d58 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java @@ -62,8 +62,8 @@ public ExtendedBlockletWrapperContainer(ExtendedBlockletWrapper[] extendedBlockl this.isFallbackJob = isFallbackJob; } - public List getExtendedBlockets(String tablePath, String queryId) - throws IOException { + public List getExtendedBlockets(String tablePath, String queryId, + boolean isCountJob) throws IOException { if (!isFallbackJob) { int numOfThreads = CarbonProperties.getNumOfThreadsForPruning(); ExecutorService executorService = Executors @@ -85,8 +85,8 @@ public List getExtendedBlockets(String tablePath, String query List>> futures = new ArrayList<>(); for (int i = 0; i < split.length; i++) { end += split[i]; - futures.add(executorService - .submit(new ExtendedBlockletDeserializerThread(start, end, tablePath, queryId))); + futures.add(executorService.submit( + new ExtendedBlockletDeserializerThread(start, end, tablePath, queryId, isCountJob))); start += split[i]; } executorService.shutdown(); @@ -109,7 +109,8 @@ public List getExtendedBlockets(String tablePath, String query } else { List extendedBlocklets = new ArrayList<>(); for (ExtendedBlockletWrapper extendedBlockletWrapper: extendedBlockletWrappers) { - extendedBlocklets.addAll(extendedBlockletWrapper.readBlocklet(tablePath, queryId)); + extendedBlocklets + .addAll(extendedBlockletWrapper.readBlocklet(tablePath, queryId, isCountJob)); } return extendedBlocklets; } @@ -125,18 +126,22 @@ private class ExtendedBlockletDeserializerThread implements Callable call() throws Exception { List extendedBlocklets = new ArrayList<>(); for (int i = start; i < end; i++) { - extendedBlocklets.addAll(extendedBlockletWrappers[i].readBlocklet(tablePath, queryId)); + extendedBlocklets.addAll(extendedBlockletWrappers[i].readBlocklet(tablePath, queryId, + isCountJob)); } return extendedBlocklets; } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index ac9e11ea3ff..56abb91dd38 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -39,6 +39,7 @@ import org.apache.carbondata.core.datamap.DataMapLevel; import org.apache.carbondata.core.datamap.DataMapStoreManager; import org.apache.carbondata.core.datamap.DataMapUtil; +import org.apache.carbondata.core.datamap.DistributableDataMapFormat; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.TableDataMap; import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; @@ -412,10 +413,41 @@ public static ReadCommittedScope getReadCommittedScope(Configuration configurati */ @Override public abstract List getSplits(JobContext job) throws IOException; + Long getDistributedCount(CarbonTable table, + List partitionNames, List validSegments) throws IOException { + DistributableDataMapFormat dataMapFormat = + new DistributableDataMapFormat(table, null, validSegments, new ArrayList(), + partitionNames, false, null, false); + dataMapFormat.setIsWriteToFile(false); + try { + DataMapJob dataMapJob = + (DataMapJob) DataMapUtil.createDataMapJob(DataMapUtil.DISTRIBUTED_JOB_NAME); + if (dataMapJob == null) { + throw new ExceptionInInitializerError("Unable to create DistributedDataMapJob"); + } + return dataMapJob.executeCountJob(dataMapFormat); + } catch (Exception e) { + LOG.error("Failed to get count from index server. Initializing fallback", e); + DataMapJob dataMapJob = DataMapUtil.getEmbeddedJob(); + if (dataMapJob == null) { + throw new ExceptionInInitializerError("Unable to create " + DataMapUtil.EMBEDDED_JOB_NAME); + } + return dataMapJob.executeCountJob(dataMapFormat); + } + } + List getDistributedSplit(CarbonTable table, FilterResolverIntf filterResolverIntf, List partitionNames, List validSegments, List invalidSegments, List segmentsToBeRefreshed) throws IOException { + return getDistributedSplit(table, filterResolverIntf, partitionNames, validSegments, + invalidSegments, segmentsToBeRefreshed, false); + } + + List getDistributedSplit(CarbonTable table, + FilterResolverIntf filterResolverIntf, List partitionNames, + List validSegments, List invalidSegments, + List segmentsToBeRefreshed, boolean isCountJob) throws IOException { try { DataMapJob dataMapJob = (DataMapJob) DataMapUtil.createDataMapJob(DataMapUtil.DISTRIBUTED_JOB_NAME); @@ -424,7 +456,7 @@ List getDistributedSplit(CarbonTable table, } return DataMapUtil .executeDataMapJob(table, filterResolverIntf, dataMapJob, partitionNames, validSegments, - invalidSegments, null, segmentsToBeRefreshed); + invalidSegments, null, false, segmentsToBeRefreshed, isCountJob); } catch (Exception e) { // Check if fallback is disabled for testing purposes then directly throw exception. if (CarbonProperties.getInstance().isFallBackDisabled()) { @@ -432,10 +464,9 @@ List getDistributedSplit(CarbonTable table, } LOG.error("Exception occurred while getting splits using index server. Initiating Fall " + "back to embedded mode", e); - return DataMapUtil - .executeDataMapJob(table, filterResolverIntf, - DataMapUtil.getEmbeddedJob(), partitionNames, validSegments, invalidSegments, null, - true, segmentsToBeRefreshed); + return DataMapUtil.executeDataMapJob(table, filterResolverIntf, + DataMapUtil.getEmbeddedJob(), partitionNames, validSegments, + invalidSegments, null, true, segmentsToBeRefreshed, isCountJob); } } @@ -578,7 +609,7 @@ private List getPrunedBlocklets(JobContext job, CarbonTable ca if (distributedCG && dataMapJob != null) { cgPrunedBlocklets = DataMapUtil .executeDataMapJob(carbonTable, filter.getResolver(), dataMapJob, partitionsToPrune, - segmentIds, invalidSegments, DataMapLevel.CG, true, new ArrayList()); + segmentIds, invalidSegments, DataMapLevel.CG, new ArrayList()); } else { cgPrunedBlocklets = cgDataMapExprWrapper.prune(segmentIds, partitionsToPrune); } @@ -606,7 +637,7 @@ private List getPrunedBlocklets(JobContext job, CarbonTable ca // Prune segments from already pruned blocklets fgPrunedBlocklets = DataMapUtil .executeDataMapJob(carbonTable, filter.getResolver(), dataMapJob, partitionsToPrune, - segmentIds, invalidSegments, fgDataMapExprWrapper.getDataMapLevel(), true, + segmentIds, invalidSegments, fgDataMapExprWrapper.getDataMapLevel(), new ArrayList()); // note that the 'fgPrunedBlocklets' has extra datamap related info compared with // 'prunedBlocklets', so the intersection should keep the elements in 'fgPrunedBlocklets' diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 3b7a80097b0..aec0f66132a 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -33,6 +33,7 @@ import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.TableDataMap; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.schema.PartitionInfo; @@ -562,15 +563,14 @@ segment needs refreshing. same thing need for select count(*) flow also. if (CarbonProperties.getInstance() .isDistributedPruningEnabled(table.getDatabaseName(), table.getTableName())) { try { - List extendedBlocklets = CarbonTableInputFormat.convertToCarbonInputSplit( + List extendedBlocklets = getDistributedSplit(table, null, partitions, filteredSegment, - allSegments.getInvalidSegments(), toBeCleanedSegments)); - for (InputSplit extendedBlocklet : extendedBlocklets) { - CarbonInputSplit blocklet = (CarbonInputSplit) extendedBlocklet; + allSegments.getInvalidSegments(), toBeCleanedSegments); + for (ExtendedBlocklet blocklet : extendedBlocklets) { String filePath = blocklet.getFilePath().replace("\\", "/"); String blockName = filePath.substring(filePath.lastIndexOf("/") + 1); blockletToRowCountMap.put(blocklet.getSegmentId() + "," + blockName, - (long) blocklet.getRowCount()); + blocklet.getRowCount()); } } catch (Exception e) { // Check if fallback is disabled then directly throw exception otherwise try driver @@ -615,15 +615,11 @@ segment needs refreshing. same thing need for select count(*) flow also. } } } else { - long totalRowCount = 0L; + long totalRowCount; if (CarbonProperties.getInstance() .isDistributedPruningEnabled(table.getDatabaseName(), table.getTableName())) { - List extendedBlocklets = CarbonTableInputFormat.convertToCarbonInputSplit( - getDistributedSplit(table, null, partitions, filteredSegment, - allSegments.getInvalidSegments(), new ArrayList())); - for (InputSplit extendedBlocklet : extendedBlocklets) { - totalRowCount += ((CarbonInputSplit) extendedBlocklet).getRowCount(); - } + totalRowCount = + getDistributedCount(table, partitions, filteredSegment); } else { TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(table); totalRowCount = defaultDataMap.getRowCount(filteredSegment, partitions, defaultDataMap); diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala index 01b88244b32..1fee051d0f9 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala @@ -65,7 +65,8 @@ class DistributedDataMapJob extends AbstractDataMapJob { dataMapFormat.getCarbonTable.getAbsoluteTableIdentifier, filterProcessor) dataMapFormat.setFilterResolverIntf(filterInf) IndexServer.getClient.getSplits(dataMapFormat) - .getExtendedBlockets(dataMapFormat.getCarbonTable.getTablePath, dataMapFormat.getQueryId) + .getExtendedBlockets(dataMapFormat.getCarbonTable.getTablePath, dataMapFormat + .getQueryId, dataMapFormat.isCountStarJob) } finally { val tmpPath = CarbonUtil .getIndexServerTempPath(dataMapFormat.getCarbonTable.getTablePath, @@ -106,7 +107,11 @@ class DistributedDataMapJob extends AbstractDataMapJob { filterInf.getFilterExpression.getFilterExpressionType == ExpressionType.UNKNOWN) { return filterProcessor.changeUnknownResloverToTrue(tableIdentifer) } - return filterInf; + filterInf + } + + override def executeCountJob(dataMapFormat: DistributableDataMapFormat): java.lang.Long = { + IndexServer.getClient.getCount(dataMapFormat).get() } } @@ -122,7 +127,7 @@ class EmbeddedDataMapJob extends AbstractDataMapJob { dataMapFormat.setIsWriteToFile(false) dataMapFormat.setFallbackJob() val splits = IndexServer.getSplits(dataMapFormat).getExtendedBlockets(dataMapFormat - .getCarbonTable.getTablePath, dataMapFormat.getQueryId) + .getCarbonTable.getTablePath, dataMapFormat.getQueryId, dataMapFormat.isCountStarJob) // Fire a job to clear the cache from executors as Embedded mode does not maintain the cache. IndexServer.invalidateSegmentCache(dataMapFormat.getCarbonTable, dataMapFormat .getValidSegmentIds.asScala.toArray) @@ -130,4 +135,8 @@ class EmbeddedDataMapJob extends AbstractDataMapJob { splits } + override def executeCountJob(dataMapFormat: DistributableDataMapFormat): java.lang.Long = { + IndexServer.getCount(dataMapFormat).get() + } + } diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala new file mode 100644 index 00000000000..be8ddfc06fa --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.indexserver + +import java.text.SimpleDateFormat +import java.util.Date +import java.util.concurrent.Executors + +import scala.collection.JavaConverters._ +import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future} +import scala.concurrent.duration.Duration + +import org.apache.hadoop.mapred.TaskAttemptID +import org.apache.hadoop.mapreduce.{InputSplit, TaskType} +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.{Partition, SparkEnv, TaskContext} +import org.apache.spark.sql.SparkSession + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.cache.CacheProvider +import org.apache.carbondata.core.datamap.{DataMapStoreManager, DistributableDataMapFormat} +import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.{CarbonProperties, CarbonThreadFactory} +import org.apache.carbondata.spark.rdd.CarbonRDD + +class DistributedCountRDD(@transient ss: SparkSession, dataMapFormat: DistributableDataMapFormat) + extends CarbonRDD[(String, String)](ss, Nil) { + + @transient private val LOGGER = LogServiceFactory.getLogService(classOf[DistributedPruneRDD] + .getName) + + private val jobTrackerId: String = { + val formatter = new SimpleDateFormat("yyyyMMddHHmm") + formatter.format(new Date()) + } + + override protected def getPreferredLocations(split: Partition): Seq[String] = { + if (split.asInstanceOf[DataMapRDDPartition].getLocations != null) { + split.asInstanceOf[DataMapRDDPartition].getLocations.toSeq + } else { + Seq() + } + } + + private def groupSplits(xs: Seq[InputSplit], n: Int) = { + val (quot, rem) = (xs.size / n, xs.size % n) + val (smaller, bigger) = xs.splitAt(xs.size - rem * (quot + 1)) + (smaller.grouped(quot) ++ bigger.grouped(quot + 1)).toList + } + + override def internalCompute(split: Partition, + context: TaskContext): Iterator[(String, String)] = { + val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) + val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId) + val inputSplits = split.asInstanceOf[DataMapRDDPartition].inputSplit + val numOfThreads = CarbonProperties.getInstance().getNumOfThreadsForExecutorPruning + val service = Executors + .newFixedThreadPool(numOfThreads, new CarbonThreadFactory("IndexPruningPool", true)) + implicit val ec: ExecutionContextExecutor = ExecutionContext + .fromExecutor(service) + val futures = if (inputSplits.length <= numOfThreads) { + inputSplits.map { + split => generateFuture(Seq(split), attemptContext) + } + } else { + groupSplits(inputSplits, numOfThreads).map { + splits => generateFuture(splits, attemptContext) + } + } + // scalastyle:off awaitresult + val results = Await.result(Future.sequence(futures), Duration.Inf).flatten + // scalastyle:on awaitresult + val executorIP = s"${ SparkEnv.get.blockManager.blockManagerId.host }_${ + SparkEnv.get.blockManager.blockManagerId.executorId + }" + val cacheSize = if (CacheProvider.getInstance().getCarbonCache != null) { + CacheProvider.getInstance().getCarbonCache.getCurrentSize + } else { + 0L + } + if (dataMapFormat.isCountStarJob) { + results.map { + case (path, count) => (executorIP + "_" + cacheSize.toString, path + "-" + count) + }.toIterator + } else { + Iterator((executorIP + "_" + cacheSize.toString, results.map(_._2.toLong).sum.toString)) + } + } + + override protected def internalGetPartitions: Array[Partition] = { + new DistributedPruneRDD(ss, dataMapFormat).partitions + } + + private def generateFuture(split: Seq[InputSplit], attemptContextImpl: TaskAttemptContextImpl) + (implicit executionContext: ExecutionContext) = { + Future { + val segments = split.map { inputSplit => + val distributable = inputSplit.asInstanceOf[DataMapDistributableWrapper] + distributable.getDistributable.getSegment + .setReadCommittedScope(dataMapFormat.getReadCommittedScope) + distributable.getDistributable.getSegment + } + val defaultDataMap = DataMapStoreManager.getInstance + .getDataMap(dataMapFormat.getCarbonTable, split.head + .asInstanceOf[DataMapDistributableWrapper].getDistributable.getDataMapSchema) + defaultDataMap.getBlockRowCount(segments.toList.asJava, dataMapFormat + .getPartitions, defaultDataMap).asScala + } + } + +} diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala index d8b9c195ca1..7baa73b1f46 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala @@ -139,7 +139,7 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS }" val value = (executorIP + "_" + cacheSize.toString, new ExtendedBlockletWrapper(f.toList .asJava, dataMapFormat.getCarbonTable.getTablePath, dataMapFormat.getQueryId, - dataMapFormat.isWriteToFile)) + dataMapFormat.isWriteToFile, dataMapFormat.isCountStarJob)) Iterator(value) } } diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala index 718bb743334..664110648e3 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala @@ -23,6 +23,7 @@ import java.util.UUID import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.LongWritable import org.apache.hadoop.ipc.{ProtocolInfo, RPC} import org.apache.hadoop.net.NetUtils import org.apache.hadoop.security.{KerberosInfo, SecurityUtil, UserGroupInformation} @@ -58,6 +59,9 @@ trait ServerInterface { */ def invalidateSegmentCache(carbonTable: CarbonTable, segmentIds: Array[String], jobGroupId: String = ""): Unit + + def getCount(request: DistributableDataMapFormat): LongWritable + } /** @@ -99,6 +103,21 @@ object IndexServer extends ServerInterface { }) } + def getCount(request: DistributableDataMapFormat): LongWritable = { + doAs { + if (!request.isFallbackJob) { + sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", request.getTaskGroupId) + sparkSession.sparkContext + .setLocalProperty("spark.job.description", request.getTaskGroupDesc) + } + val splits = new DistributedCountRDD(sparkSession, request).collect() + if (!request.isFallbackJob) { + DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet) + } + new LongWritable(splits.map(_._2.toLong).sum) + } + } + def getSplits(request: DistributableDataMapFormat): ExtendedBlockletWrapperContainer = { doAs { if (!request.isFallbackJob) {