Skip to content

Commit

Permalink
Merge 1ae8506 into 66982f3
Browse files Browse the repository at this point in the history
  • Loading branch information
dhatchayani committed Mar 21, 2019
2 parents 66982f3 + 1ae8506 commit b870a48
Show file tree
Hide file tree
Showing 14 changed files with 202 additions and 45 deletions.
Expand Up @@ -24,4 +24,6 @@ public interface CarbonCommonConstantsInternal {

String QUERY_ON_PRE_AGG_STREAMING = "carbon.query.on.preagg.streaming.";

String ROW_COUNT = "rowCount";

}
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
Expand All @@ -34,6 +35,7 @@
import org.apache.carbondata.core.datamap.dev.BlockletSerializer;
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.datamap.dev.DataMapFactory;
import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainBlocklet;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.impl.FileFactory;
Expand Down Expand Up @@ -499,4 +501,46 @@ public List<Segment> pruneSegments(List<Segment> segments, FilterResolverIntf fi
}
return prunedSegments;
}

/**
* Prune the datamap of the given segments and return the Map of blocklet path and row count
*
* @param segments
* @param partitions
* @return
* @throws IOException
*/
public Map<String, Long> getBlockRowCount(List<Segment> segments,
final List<PartitionSpec> partitions, TableDataMap defaultDataMap)
throws IOException {
Map<String, Long> blockletToRowCountMap = new HashMap<>();
for (Segment segment : segments) {
List<CoarseGrainDataMap> dataMaps = defaultDataMap.getDataMapFactory().getDataMaps(segment);
for (CoarseGrainDataMap dataMap : dataMaps) {
dataMap.getRowCountForEachBlock(segment, partitions, blockletToRowCountMap);
}
}
return blockletToRowCountMap;
}

/**
* Prune the datamap of the given segments and return the Map of blocklet path and row count
*
* @param segments
* @param partitions
* @return
* @throws IOException
*/
public long getRowCount(List<Segment> segments, final List<PartitionSpec> partitions,
TableDataMap defaultDataMap) throws IOException {
long totalRowCount = 0L;
for (Segment segment : segments) {
List<CoarseGrainDataMap> dataMaps = defaultDataMap.getDataMapFactory().getDataMaps(segment);
for (CoarseGrainDataMap dataMap : dataMaps) {
totalRowCount += dataMap.getRowCount(segment, partitions);
}
}
return totalRowCount;
}

}
Expand Up @@ -18,8 +18,10 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.indexstore.Blocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
Expand Down Expand Up @@ -54,6 +56,19 @@ List<T> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
List<T> prune(Expression filter, SegmentProperties segmentProperties,
List<PartitionSpec> partitions, CarbonTable carbonTable) throws IOException;

/**
* Prune the data maps for finding the row count. It returns a Map of
* blockletpath and the row count
*/
long getRowCount(Segment segment, List<PartitionSpec> partitions) throws IOException;

/**
* Prune the data maps for finding the row count for each block. It returns a Map of
* blockletpath and the row count
*/
Map<String, Long> getRowCountForEachBlock(Segment segment, List<PartitionSpec> partitions,
Map<String, Long> blockletToRowCountMap) throws IOException;

// TODO Move this method to Abstract class
/**
* Validate whether the current segment needs to be fetching the required data
Expand Down
Expand Up @@ -18,9 +18,11 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.indexstore.Blocklet;
Expand All @@ -41,6 +43,18 @@ public List<Blocklet> prune(Expression expression, SegmentProperties segmentProp
throw new UnsupportedOperationException("Filter expression not supported");
}

@Override
public long getRowCount(Segment segment, List<PartitionSpec> partitions) throws IOException {
throw new UnsupportedOperationException("Operation not supported");
}

@Override
public Map<String, Long> getRowCountForEachBlock(Segment segment, List<PartitionSpec> partitions,
Map<String, Long> blockletToRowCountMap) throws IOException {
throw new UnsupportedOperationException("Operation not supported");
}


@Override public int getNumberOfEntries() {
// keep default, one record in one datamap
return 1;
Expand Down
Expand Up @@ -18,9 +18,11 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.indexstore.PartitionSpec;
Expand All @@ -40,6 +42,17 @@ public List<FineGrainBlocklet> prune(Expression filter, SegmentProperties segmen
throw new UnsupportedOperationException("Filter expression not supported");
}

@Override
public long getRowCount(Segment segment, List<PartitionSpec> partitions) throws IOException {
throw new UnsupportedOperationException("Operation not supported");
}

@Override
public Map<String, Long> getRowCountForEachBlock(Segment segment, List<PartitionSpec> partitions,
Map<String, Long> blockletToRowCountMap) throws IOException {
throw new UnsupportedOperationException("Operation not supported");
}

@Override public int getNumberOfEntries() {
// keep default, one record in one datamap
return 1;
Expand Down
Expand Up @@ -23,10 +23,13 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.DataMapModel;
import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
Expand Down Expand Up @@ -217,6 +220,7 @@ protected DataMapRowImpl loadBlockInfoForOldStore(CarbonRowSchema[] taskSummaryS
CarbonRowSchema[] schema = getFileFooterEntrySchema();
boolean[] minMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
Arrays.fill(minMaxFlag, true);
long totalRowCount = 0;
for (DataFileFooter fileFooter : indexInfo) {
TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
BlockMetaInfo blockMetaInfo =
Expand All @@ -241,11 +245,14 @@ protected DataMapRowImpl loadBlockInfoForOldStore(CarbonRowSchema[] taskSummaryS
summaryRow = loadToUnsafeBlock(schema, taskSummarySchema, fileFooter, segmentProperties,
getMinMaxCacheColumns(), blockInfo.getFilePath(), summaryRow,
blockMetaInfo, updatedMinValues, updatedMaxValues, minMaxFlag);
totalRowCount += fileFooter.getNumberOfRows();
}
}
List<Short> blockletCountList = new ArrayList<>();
blockletCountList.add((short) 0);
byte[] blockletCount = convertRowCountFromShortToByteArray(blockletCountList);
// set the total row count
summaryRow.setLong(totalRowCount, TASK_ROW_COUNT);
summaryRow.setByteArray(blockletCount, taskSummarySchema.length - 1);
setMinMaxFlagForTaskSummary(summaryRow, taskSummarySchema, segmentProperties, minMaxFlag);
return summaryRow;
Expand Down Expand Up @@ -289,6 +296,7 @@ private DataMapRowImpl loadBlockMetaInfo(CarbonRowSchema[] taskSummarySchema,
// min max flag for task summary
boolean[] taskSummaryMinMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
Arrays.fill(taskSummaryMinMaxFlag, true);
long totalRowCount = 0;
for (DataFileFooter fileFooter : indexInfo) {
TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
BlockMetaInfo blockMetaInfo =
Expand Down Expand Up @@ -331,6 +339,7 @@ segmentProperties, getMinMaxCacheColumns(), previousBlockInfo.getFilePath(),
summaryRow,
blockletDataMapInfo.getBlockMetaInfoMap().get(previousBlockInfo.getFilePath()),
blockMinValues, blockMaxValues, minMaxFlag);
totalRowCount += previousDataFileFooter.getNumberOfRows();
minMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
Arrays.fill(minMaxFlag, true);
// flag to check whether last file footer entry is different from previous entry.
Expand Down Expand Up @@ -361,9 +370,12 @@ segmentProperties, getMinMaxCacheColumns(), previousBlockInfo.getFilePath(),
blockletDataMapInfo.getBlockMetaInfoMap()
.get(previousDataFileFooter.getBlockInfo().getTableBlockInfo().getFilePath()),
blockMinValues, blockMaxValues, minMaxFlag);
totalRowCount += previousDataFileFooter.getNumberOfRows();
blockletCountInEachBlock.add(totalBlockletsInOneBlock);
}
byte[] blockletCount = convertRowCountFromShortToByteArray(blockletCountInEachBlock);
// set the total row count
summaryRow.setLong(totalRowCount, TASK_ROW_COUNT);
// blocklet count index is the last index
summaryRow.setByteArray(blockletCount, taskSummarySchema.length - 1);
setMinMaxFlagForTaskSummary(summaryRow, taskSummarySchema, segmentProperties,
Expand Down Expand Up @@ -409,7 +421,7 @@ protected DataMapRowImpl loadToUnsafeBlock(CarbonRowSchema[] schema,
}
DataMapRow row = new DataMapRowImpl(schema);
int ordinal = 0;
int taskMinMaxOrdinal = 0;
int taskMinMaxOrdinal = 1;
// get min max values for columns to be cached
byte[][] minValuesForColumnsToBeCached = BlockletDataMapUtil
.getMinMaxForColumnsToBeCached(segmentProperties, minMaxCacheColumns, minValues);
Expand Down Expand Up @@ -648,6 +660,49 @@ protected int getTotalBlocklets() {
return sum;
}

@Override
public long getRowCount(Segment segment, List<PartitionSpec> partitions) {
long totalRowCount =
taskSummaryDMStore.getDataMapRow(getTaskSummarySchema(), 0).getLong(TASK_ROW_COUNT);
if (totalRowCount == 0) {
Map<String, Long> blockletToRowCountMap = new HashMap<>();
getRowCountForEachBlock(segment, partitions, blockletToRowCountMap);
for (long blockletRowCount : blockletToRowCountMap.values()) {
totalRowCount += blockletRowCount;
}
} else {
if (taskSummaryDMStore.getRowCount() == 0) {
return 0L;
}
}
return totalRowCount;
}

public Map<String, Long> getRowCountForEachBlock(Segment segment, List<PartitionSpec> partitions,
Map<String, Long> blockletToRowCountMap) {
if (memoryDMStore.getRowCount() == 0) {
return new HashMap<>();
}
// if it has partitioned datamap but there is no partitioned information stored, it means
// partitions are dropped so return empty list.
if (partitions != null) {
if (!validatePartitionInfo(partitions)) {
return new HashMap<>();
}
}
CarbonRowSchema[] schema = getFileFooterEntrySchema();
int numEntries = memoryDMStore.getRowCount();
for (int i = 0; i < numEntries; i++) {
DataMapRow dataMapRow = memoryDMStore.getDataMapRow(schema, i);
String fileName = new String(dataMapRow.getByteArray(FILE_PATH_INDEX),
CarbonCommonConstants.DEFAULT_CHARSET_CLASS) + CarbonTablePath.getCarbonDataExtension();
int rowCount = dataMapRow.getInt(ROW_COUNT_INDEX);
// prepend segment number with the blocklet file path
blockletToRowCountMap.put((segment.getSegmentNo() + "," + fileName), (long) rowCount);
}
return blockletToRowCountMap;
}

private List<Blocklet> prune(FilterResolverIntf filterExp) {
if (memoryDMStore.getRowCount() == 0) {
return new ArrayList<>();
Expand Down
Expand Up @@ -146,6 +146,7 @@ private DataMapRowImpl loadBlockletMetaInfo(CarbonRowSchema[] taskSummarySchema,
relativeBlockletId += fileFooter.getBlockletList().size();
}
}
summaryRow.setLong(0L, TASK_ROW_COUNT);
setMinMaxFlagForTaskSummary(summaryRow, taskSummarySchema, segmentProperties,
summaryRowMinMaxFlag);
return summaryRow;
Expand All @@ -163,7 +164,7 @@ private DataMapRowImpl loadToUnsafe(CarbonRowSchema[] schema, CarbonRowSchema[]
for (int index = 0; index < blockletList.size(); index++) {
DataMapRow row = new DataMapRowImpl(schema);
int ordinal = 0;
int taskMinMaxOrdinal = 0;
int taskMinMaxOrdinal = 1;
BlockletInfo blockletInfo = blockletList.get(index);
blockletInfo.setSorted(fileFooter.isSorted());
BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex();
Expand Down
Expand Up @@ -50,15 +50,17 @@ public interface BlockletDataMapRowIndexes {
int BLOCKLET_ID_INDEX = 12;

// Summary dataMap row indexes
int TASK_MIN_VALUES_INDEX = 0;
int TASK_ROW_COUNT = 0;

int TASK_MAX_VALUES_INDEX = 1;
int TASK_MIN_VALUES_INDEX = 1;

int SUMMARY_INDEX_FILE_NAME = 2;
int TASK_MAX_VALUES_INDEX = 2;

int SUMMARY_SEGMENTID = 3;
int SUMMARY_INDEX_FILE_NAME = 3;

int TASK_MIN_MAX_FLAG = 4;
int SUMMARY_SEGMENTID = 4;

int SUMMARY_INDEX_PATH = 5;
int TASK_MIN_MAX_FLAG = 5;

int SUMMARY_INDEX_PATH = 6;
}
Expand Up @@ -113,6 +113,8 @@ public static CarbonRowSchema[] createTaskSummarySchema(SegmentProperties segmen
List<CarbonColumn> minMaxCacheColumns,
boolean storeBlockletCount, boolean filePathToBeStored) throws MemoryException {
List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>();
// for number of rows.
taskMinMaxSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
// get MinMax Schema
getMinMaxSchema(segmentProperties, taskMinMaxSchemas, minMaxCacheColumns);
// for storing file name
Expand Down
Expand Up @@ -28,6 +28,7 @@

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
Expand Down Expand Up @@ -747,9 +748,12 @@ public static List<Segment> getListOfSegmentsToMarkDeleted(Map<String, Long> seg
/**
* Return row count of input block
*/
public static long getRowCount(
BlockMappingVO blockMappingVO,
CarbonTable carbonTable) {
public static long getRowCount(BlockMappingVO blockMappingVO, CarbonTable carbonTable) {
if (blockMappingVO.getBlockRowCountMapping().size() == 1
&& blockMappingVO.getBlockRowCountMapping().get(CarbonCommonConstantsInternal.ROW_COUNT)
!= null) {
return blockMappingVO.getBlockRowCountMapping().get(CarbonCommonConstantsInternal.ROW_COUNT);
}
SegmentUpdateStatusManager updateStatusManager =
new SegmentUpdateStatusManager(carbonTable);
long rowCount = 0;
Expand Down

0 comments on commit b870a48

Please sign in to comment.