diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstantsInternal.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstantsInternal.java index 398e03a864b..cfcbe44ae1f 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstantsInternal.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstantsInternal.java @@ -24,4 +24,6 @@ public interface CarbonCommonConstantsInternal { String QUERY_ON_PRE_AGG_STREAMING = "carbon.query.on.preagg.streaming."; + String ROW_COUNT = "rowCount"; + } diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java index 0d46fd8af9a..15b0e8bdcd6 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -34,6 +35,7 @@ import org.apache.carbondata.core.datamap.dev.BlockletSerializer; import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.datamap.dev.DataMapFactory; +import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap; import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainBlocklet; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.impl.FileFactory; @@ -499,4 +501,46 @@ public List pruneSegments(List segments, FilterResolverIntf fi } return prunedSegments; } + + /** + * Prune the datamap of the given segments and return the Map of blocklet path and row count + * + * @param segments + * @param partitions + * @return + * @throws IOException + */ + public Map getBlockRowCount(List segments, + final List partitions, TableDataMap defaultDataMap) + throws IOException { + Map blockletToRowCountMap = new HashMap<>(); + for (Segment segment : segments) { + List dataMaps = defaultDataMap.getDataMapFactory().getDataMaps(segment); + for (CoarseGrainDataMap dataMap : dataMaps) { + dataMap.getRowCountForEachBlock(segment, partitions, blockletToRowCountMap); + } + } + return blockletToRowCountMap; + } + + /** + * Prune the datamap of the given segments and return the Map of blocklet path and row count + * + * @param segments + * @param partitions + * @return + * @throws IOException + */ + public long getRowCount(List segments, final List partitions, + TableDataMap defaultDataMap) throws IOException { + long totalRowCount = 0L; + for (Segment segment : segments) { + List dataMaps = defaultDataMap.getDataMapFactory().getDataMaps(segment); + for (CoarseGrainDataMap dataMap : dataMaps) { + totalRowCount += dataMap.getRowCount(segment, partitions); + } + } + return totalRowCount; + } + } diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java index c52cc4194cf..adc74b90278 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java @@ -18,8 +18,10 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.indexstore.Blocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; @@ -54,6 +56,19 @@ List prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties, List prune(Expression filter, SegmentProperties segmentProperties, List partitions, CarbonTable carbonTable) throws IOException; + /** + * Prune the data maps for finding the row count. It returns a Map of + * blockletpath and the row count + */ + long getRowCount(Segment segment, List partitions) throws IOException; + + /** + * Prune the data maps for finding the row count for each block. It returns a Map of + * blockletpath and the row count + */ + Map getRowCountForEachBlock(Segment segment, List partitions, + Map blockletToRowCountMap) throws IOException; + // TODO Move this method to Abstract class /** * Validate whether the current segment needs to be fetching the required data diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java index b4af9d9e563..3aba163b60a 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java @@ -18,9 +18,11 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.indexstore.Blocklet; @@ -41,6 +43,18 @@ public List prune(Expression expression, SegmentProperties segmentProp throw new UnsupportedOperationException("Filter expression not supported"); } + @Override + public long getRowCount(Segment segment, List partitions) throws IOException { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override + public Map getRowCountForEachBlock(Segment segment, List partitions, + Map blockletToRowCountMap) throws IOException { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public int getNumberOfEntries() { // keep default, one record in one datamap return 1; diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java index 03b2bfb1487..3a47df16c40 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java @@ -18,9 +18,11 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.indexstore.PartitionSpec; @@ -40,6 +42,17 @@ public List prune(Expression filter, SegmentProperties segmen throw new UnsupportedOperationException("Filter expression not supported"); } + @Override + public long getRowCount(Segment segment, List partitions) throws IOException { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override + public Map getRowCountForEachBlock(Segment segment, List partitions, + Map blockletToRowCountMap) throws IOException { + throw new UnsupportedOperationException("Operation not supported"); + } + @Override public int getNumberOfEntries() { // keep default, one record in one datamap return 1; diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java index a7818c210f7..8ebd50d6c96 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java @@ -23,10 +23,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.DataMapModel; import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap; import org.apache.carbondata.core.datastore.block.SegmentProperties; @@ -217,6 +220,7 @@ protected DataMapRowImpl loadBlockInfoForOldStore(CarbonRowSchema[] taskSummaryS CarbonRowSchema[] schema = getFileFooterEntrySchema(); boolean[] minMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length]; Arrays.fill(minMaxFlag, true); + long totalRowCount = 0; for (DataFileFooter fileFooter : indexInfo) { TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo(); BlockMetaInfo blockMetaInfo = @@ -241,11 +245,14 @@ protected DataMapRowImpl loadBlockInfoForOldStore(CarbonRowSchema[] taskSummaryS summaryRow = loadToUnsafeBlock(schema, taskSummarySchema, fileFooter, segmentProperties, getMinMaxCacheColumns(), blockInfo.getFilePath(), summaryRow, blockMetaInfo, updatedMinValues, updatedMaxValues, minMaxFlag); + totalRowCount += fileFooter.getNumberOfRows(); } } List blockletCountList = new ArrayList<>(); blockletCountList.add((short) 0); byte[] blockletCount = convertRowCountFromShortToByteArray(blockletCountList); + // set the total row count + summaryRow.setLong(totalRowCount, TASK_ROW_COUNT); summaryRow.setByteArray(blockletCount, taskSummarySchema.length - 1); setMinMaxFlagForTaskSummary(summaryRow, taskSummarySchema, segmentProperties, minMaxFlag); return summaryRow; @@ -289,6 +296,7 @@ private DataMapRowImpl loadBlockMetaInfo(CarbonRowSchema[] taskSummarySchema, // min max flag for task summary boolean[] taskSummaryMinMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length]; Arrays.fill(taskSummaryMinMaxFlag, true); + long totalRowCount = 0; for (DataFileFooter fileFooter : indexInfo) { TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo(); BlockMetaInfo blockMetaInfo = @@ -331,6 +339,7 @@ segmentProperties, getMinMaxCacheColumns(), previousBlockInfo.getFilePath(), summaryRow, blockletDataMapInfo.getBlockMetaInfoMap().get(previousBlockInfo.getFilePath()), blockMinValues, blockMaxValues, minMaxFlag); + totalRowCount += previousDataFileFooter.getNumberOfRows(); minMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length]; Arrays.fill(minMaxFlag, true); // flag to check whether last file footer entry is different from previous entry. @@ -361,9 +370,12 @@ segmentProperties, getMinMaxCacheColumns(), previousBlockInfo.getFilePath(), blockletDataMapInfo.getBlockMetaInfoMap() .get(previousDataFileFooter.getBlockInfo().getTableBlockInfo().getFilePath()), blockMinValues, blockMaxValues, minMaxFlag); + totalRowCount += previousDataFileFooter.getNumberOfRows(); blockletCountInEachBlock.add(totalBlockletsInOneBlock); } byte[] blockletCount = convertRowCountFromShortToByteArray(blockletCountInEachBlock); + // set the total row count + summaryRow.setLong(totalRowCount, TASK_ROW_COUNT); // blocklet count index is the last index summaryRow.setByteArray(blockletCount, taskSummarySchema.length - 1); setMinMaxFlagForTaskSummary(summaryRow, taskSummarySchema, segmentProperties, @@ -409,7 +421,7 @@ protected DataMapRowImpl loadToUnsafeBlock(CarbonRowSchema[] schema, } DataMapRow row = new DataMapRowImpl(schema); int ordinal = 0; - int taskMinMaxOrdinal = 0; + int taskMinMaxOrdinal = 1; // get min max values for columns to be cached byte[][] minValuesForColumnsToBeCached = BlockletDataMapUtil .getMinMaxForColumnsToBeCached(segmentProperties, minMaxCacheColumns, minValues); @@ -648,6 +660,49 @@ protected int getTotalBlocklets() { return sum; } + @Override + public long getRowCount(Segment segment, List partitions) { + long totalRowCount = + taskSummaryDMStore.getDataMapRow(getTaskSummarySchema(), 0).getLong(TASK_ROW_COUNT); + if (totalRowCount == 0) { + Map blockletToRowCountMap = new HashMap<>(); + getRowCountForEachBlock(segment, partitions, blockletToRowCountMap); + for (long blockletRowCount : blockletToRowCountMap.values()) { + totalRowCount += blockletRowCount; + } + } else { + if (taskSummaryDMStore.getRowCount() == 0) { + return 0L; + } + } + return totalRowCount; + } + + public Map getRowCountForEachBlock(Segment segment, List partitions, + Map blockletToRowCountMap) { + if (memoryDMStore.getRowCount() == 0) { + return new HashMap<>(); + } + // if it has partitioned datamap but there is no partitioned information stored, it means + // partitions are dropped so return empty list. + if (partitions != null) { + if (!validatePartitionInfo(partitions)) { + return new HashMap<>(); + } + } + CarbonRowSchema[] schema = getFileFooterEntrySchema(); + int numEntries = memoryDMStore.getRowCount(); + for (int i = 0; i < numEntries; i++) { + DataMapRow dataMapRow = memoryDMStore.getDataMapRow(schema, i); + String fileName = new String(dataMapRow.getByteArray(FILE_PATH_INDEX), + CarbonCommonConstants.DEFAULT_CHARSET_CLASS) + CarbonTablePath.getCarbonDataExtension(); + int rowCount = dataMapRow.getInt(ROW_COUNT_INDEX); + // prepend segment number with the blocklet file path + blockletToRowCountMap.put((segment.getSegmentNo() + "," + fileName), (long) rowCount); + } + return blockletToRowCountMap; + } + private List prune(FilterResolverIntf filterExp) { if (memoryDMStore.getRowCount() == 0) { return new ArrayList<>(); diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java index 191056dd6df..7939a17dfb5 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java @@ -146,6 +146,7 @@ private DataMapRowImpl loadBlockletMetaInfo(CarbonRowSchema[] taskSummarySchema, relativeBlockletId += fileFooter.getBlockletList().size(); } } + summaryRow.setLong(0L, TASK_ROW_COUNT); setMinMaxFlagForTaskSummary(summaryRow, taskSummarySchema, segmentProperties, summaryRowMinMaxFlag); return summaryRow; @@ -163,7 +164,7 @@ private DataMapRowImpl loadToUnsafe(CarbonRowSchema[] schema, CarbonRowSchema[] for (int index = 0; index < blockletList.size(); index++) { DataMapRow row = new DataMapRowImpl(schema); int ordinal = 0; - int taskMinMaxOrdinal = 0; + int taskMinMaxOrdinal = 1; BlockletInfo blockletInfo = blockletList.get(index); blockletInfo.setSorted(fileFooter.isSorted()); BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex(); diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapRowIndexes.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapRowIndexes.java index 085fb7d5cd4..dcaecd2499e 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapRowIndexes.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapRowIndexes.java @@ -50,15 +50,17 @@ public interface BlockletDataMapRowIndexes { int BLOCKLET_ID_INDEX = 12; // Summary dataMap row indexes - int TASK_MIN_VALUES_INDEX = 0; + int TASK_ROW_COUNT = 0; - int TASK_MAX_VALUES_INDEX = 1; + int TASK_MIN_VALUES_INDEX = 1; - int SUMMARY_INDEX_FILE_NAME = 2; + int TASK_MAX_VALUES_INDEX = 2; - int SUMMARY_SEGMENTID = 3; + int SUMMARY_INDEX_FILE_NAME = 3; - int TASK_MIN_MAX_FLAG = 4; + int SUMMARY_SEGMENTID = 4; - int SUMMARY_INDEX_PATH = 5; + int TASK_MIN_MAX_FLAG = 5; + + int SUMMARY_INDEX_PATH = 6; } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java index 7a2e13a5f91..52b9fb37133 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java @@ -113,6 +113,8 @@ public static CarbonRowSchema[] createTaskSummarySchema(SegmentProperties segmen List minMaxCacheColumns, boolean storeBlockletCount, boolean filePathToBeStored) throws MemoryException { List taskMinMaxSchemas = new ArrayList<>(); + // for number of rows. + taskMinMaxSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG)); // get MinMax Schema getMinMaxSchema(segmentProperties, taskMinMaxSchemas, minMaxCacheColumns); // for storing file name diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java index bd8c4658523..a632f039afd 100644 --- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java @@ -28,6 +28,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; @@ -747,9 +748,12 @@ public static List getListOfSegmentsToMarkDeleted(Map seg /** * Return row count of input block */ - public static long getRowCount( - BlockMappingVO blockMappingVO, - CarbonTable carbonTable) { + public static long getRowCount(BlockMappingVO blockMappingVO, CarbonTable carbonTable) { + if (blockMappingVO.getBlockRowCountMapping().size() == 1 + && blockMappingVO.getBlockRowCountMapping().get(CarbonCommonConstantsInternal.ROW_COUNT) + != null) { + return blockMappingVO.getBlockRowCountMapping().get(CarbonCommonConstantsInternal.ROW_COUNT); + } SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(carbonTable); long rowCount = 0; diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 281143b0ce3..4ba8b8cc44e 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -28,11 +28,11 @@ import java.util.Map; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal; import org.apache.carbondata.core.datamap.DataMapStoreManager; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.TableDataMap; import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.schema.PartitionInfo; @@ -58,7 +58,6 @@ import org.apache.carbondata.core.stream.StreamFile; import org.apache.carbondata.core.stream.StreamPruner; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.hadoop.CarbonInputSplit; import org.apache.hadoop.conf.Configuration; @@ -576,7 +575,7 @@ public Segment[] getSegmentsToAccess(JobContext job, ReadCommittedScope readComm * Get the row count of the Block and mapping of segment and Block count. */ public BlockMappingVO getBlockRowCount(Job job, CarbonTable table, - List partitions) throws IOException { + List partitions, boolean isUpdateFlow) throws IOException { // Normal query flow goes to CarbonInputFormat#getPrunedBlocklets and initialize the // pruning info for table we queried. But here count star query without filter uses a different // query plan, and no pruning info is initialized. When it calls default data map to @@ -586,7 +585,7 @@ public BlockMappingVO getBlockRowCount(Job job, CarbonTable table, ExplainCollector.remove(); AbsoluteTableIdentifier identifier = table.getAbsoluteTableIdentifier(); - TableDataMap blockletMap = DataMapStoreManager.getInstance().getDefaultDataMap(table); + TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(table); ReadCommittedScope readCommittedScope = getReadCommitted(job, identifier); LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList(); @@ -602,6 +601,7 @@ public BlockMappingVO getBlockRowCount(Job job, CarbonTable table, // TODO: currently only batch segment is supported, add support for streaming table List filteredSegment = getFilteredSegment(job, allSegments.getValidSegments(), false, readCommittedScope); + boolean isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0); /* In the select * flow, getSplits() method was clearing the segmentMap if, segment needs refreshing. same thing need for select count(*) flow also. For NonTransactional table, one of the reason for a segment refresh is below scenario. @@ -624,36 +624,40 @@ segment needs refreshing. same thing need for select count(*) flow also. .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()), toBeCleanedSegments); } - List blocklets = - blockletMap.prune(filteredSegment, (FilterResolverIntf) null, partitions); - for (ExtendedBlocklet blocklet : blocklets) { - String blockName = blocklet.getPath(); - blockName = CarbonTablePath.getCarbonDataFileName(blockName); - blockName = blockName + CarbonTablePath.getCarbonDataExtension(); - - long rowCount = blocklet.getDetailInfo().getRowCount(); - - String segmentId = Segment.toSegment(blocklet.getSegmentId()).getSegmentNo(); - String key = CarbonUpdateUtil.getSegmentBlockNameKey(segmentId, blockName); - - // if block is invalid then don't add the count - SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key); - - if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getSegmentStatus())) { - Long blockCount = blockRowCountMapping.get(key); - if (blockCount == null) { - blockCount = 0L; - Long count = segmentAndBlockCountMapping.get(segmentId); - if (count == null) { - count = 0L; + if (isIUDTable || isUpdateFlow) { + Map blockletToRowCountMap = + defaultDataMap.getBlockRowCount(filteredSegment, partitions, defaultDataMap); + // key is the (segmentId","+blockletPath) and key is the row count of that blocklet + for (Map.Entry eachBlocklet : blockletToRowCountMap.entrySet()) { + String[] segmentIdAndPath = eachBlocklet.getKey().split(",", 2); + String segmentId = segmentIdAndPath[0]; + String blockName = segmentIdAndPath[1]; + + long rowCount = eachBlocklet.getValue(); + + String key = CarbonUpdateUtil.getSegmentBlockNameKey(segmentId, blockName); + + // if block is invalid then don't add the count + SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key); + + if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getSegmentStatus())) { + Long blockCount = blockRowCountMapping.get(key); + if (blockCount == null) { + blockCount = 0L; + Long count = segmentAndBlockCountMapping.get(segmentId); + if (count == null) { + count = 0L; + } + segmentAndBlockCountMapping.put(segmentId, count + 1); } - segmentAndBlockCountMapping.put(segmentId, count + 1); + blockCount += rowCount; + blockRowCountMapping.put(key, blockCount); } - blockCount += rowCount; - blockRowCountMapping.put(key, blockCount); } + } else { + long totalRowCount = defaultDataMap.getRowCount(filteredSegment, partitions, defaultDataMap); + blockRowCountMapping.put(CarbonCommonConstantsInternal.ROW_COUNT, totalRowCount); } - return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping); } diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala index 7c9a9fc5766..001964a33d7 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala @@ -31,7 +31,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.dev.DataMap import org.apache.carbondata.core.datamap.{DataMapChooser, DataMapStoreManager, Segment, TableDataMap} import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder -import org.apache.carbondata.core.indexstore.blockletindex.{BlockDataMap, BlockletDataMap} +import org.apache.carbondata.core.indexstore.blockletindex.{BlockDataMap, BlockletDataMap, BlockletDataMapRowIndexes} import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema import org.apache.carbondata.core.indexstore.Blocklet import org.apache.carbondata.core.metadata.datatype.DataTypes @@ -93,7 +93,8 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be val index = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex val summarySchema = SegmentPropertiesAndSchemaHolder.getInstance() .getSegmentPropertiesWrapper(index).getTaskSummarySchemaForBlock(storeBlockletCount, false) - val minSchemas = summarySchema(0).asInstanceOf[CarbonRowSchema.StructCarbonRowSchema] + val minSchemas = summarySchema(BlockletDataMapRowIndexes.TASK_MIN_VALUES_INDEX) + .asInstanceOf[CarbonRowSchema.StructCarbonRowSchema] .getChildSchemas minSchemas.length == expectedLength } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala index 297cb54f3b5..cfceea453ec 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala @@ -64,7 +64,7 @@ case class CarbonCountStar( sparkSession, TableIdentifier( carbonTable.getTableName, - Some(carbonTable.getDatabaseName))).map(_.asJava).orNull), + Some(carbonTable.getDatabaseName))).map(_.asJava).orNull, false), carbonTable) val valueRaw = attributesRaw.head.dataType match { diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala index a88a02b1f30..7337496378b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala @@ -104,7 +104,7 @@ object DeleteExecution { CarbonFilters.getPartitions( Seq.empty, sparkSession, - TableIdentifier(tableName, databaseNameOp)).map(_.asJava).orNull) + TableIdentifier(tableName, databaseNameOp)).map(_.asJava).orNull, true) val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(carbonTable) CarbonUpdateUtil .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr)