diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapBuilder.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapBuilder.java index 570a1ce7d15..c96d4a009f4 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapBuilder.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapBuilder.java @@ -35,4 +35,8 @@ public interface DataMapBuilder { void finish() throws IOException; void close() throws IOException; + /** + * whether create index on internal carbon bytes (such as dictionary encoded) or original value + */ + boolean isIndexForCarbonRawBytes(); } diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java index 0b1dde512f3..8902fc24b89 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java @@ -26,6 +26,7 @@ import org.apache.carbondata.core.scan.collector.impl.RestructureBasedRawResultCollector; import org.apache.carbondata.core.scan.collector.impl.RestructureBasedVectorResultCollector; import org.apache.carbondata.core.scan.collector.impl.RowIdBasedResultCollector; +import org.apache.carbondata.core.scan.collector.impl.RowIdRawBasedResultCollector; import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; /** @@ -52,6 +53,9 @@ public static AbstractScannedResultCollector getScannedResultCollector( if (blockExecutionInfo.isRestructuredBlock()) { LOGGER.info("Restructure based raw collector is used to scan and collect the data"); scannerResultAggregator = new RestructureBasedRawResultCollector(blockExecutionInfo); + } else if (blockExecutionInfo.isRequiredRowId()) { + LOGGER.info("RowId based raw collector is used to scan and collect the data"); + scannerResultAggregator = new RowIdRawBasedResultCollector(blockExecutionInfo); } else { LOGGER.info("Row based raw collector is used to scan and collect the data"); scannerResultAggregator = new RawBasedResultCollector(blockExecutionInfo); diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdRawBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdRawBasedResultCollector.java new file mode 100644 index 00000000000..a68265d5902 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdRawBasedResultCollector.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.collector.impl; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.model.ProjectionMeasure; +import org.apache.carbondata.core.scan.result.BlockletScannedResult; +import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; +import org.apache.carbondata.core.stats.QueryStatistic; +import org.apache.carbondata.core.stats.QueryStatisticsConstants; + +/** + * It is not a collector it is just a scanned result holder. + * most of the lines are copyied from `RawBasedResultCollector`, the difference in function is that + * this class return all the dimensions in a ByteArrayWrapper and append blockletNo/PageId/RowId at + * end of the row. + * This implementation refers to `RawBasedResultCollector` and `RowIdBaedResultCollector` + */ +@InterfaceAudience.Internal +public class RowIdRawBasedResultCollector extends AbstractScannedResultCollector { + + public RowIdRawBasedResultCollector(BlockExecutionInfo blockExecutionInfos) { + super(blockExecutionInfos); + } + + /** + * This method will add a record both key and value to list object + * it will keep track of how many record is processed, to handle limit scenario + */ + @Override + public List collectResultInRow(BlockletScannedResult scannedResult, + int batchSize) { + long startTime = System.currentTimeMillis(); + List listBasedResult = new ArrayList<>(batchSize); + ProjectionMeasure[] queryMeasures = executionInfo.getProjectionMeasures(); + // scan the record and add to list + scanAndFillData(scannedResult, batchSize, listBasedResult, queryMeasures); + QueryStatistic resultPrepTime = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.RESULT_PREP_TIME); + resultPrepTime.addCountStatistic(QueryStatisticsConstants.RESULT_PREP_TIME, + resultPrepTime.getCount() + (System.currentTimeMillis() - startTime)); + return listBasedResult; + } + + /** + * This method will scan and fill dimension and measure data + * + * @param scannedResult + * @param batchSize + * @param listBasedResult + * @param queryMeasures + */ + protected void scanAndFillData(BlockletScannedResult scannedResult, int batchSize, + List listBasedResult, ProjectionMeasure[] queryMeasures) { + int numberOfPages = scannedResult.numberOfpages(); + // loop will exit once the batchSize data has been read or the pages have been exhausted + while (scannedResult.getCurrentPageCounter() < numberOfPages) { + int currentPageRowCount = scannedResult.getCurrentPageRowCount(); + if (currentPageRowCount == 0) { + scannedResult.incrementPageCounter(); + continue; + } + int rowCounter = scannedResult.getRowCounter(); + // getRowCounter holds total number rows processed. Calculate the + // Left over space through getRowCounter only. + int availableRows = currentPageRowCount - rowCounter; + // rows available in current page that can be processed from current page + int availableBatchRowCount = Math.min(batchSize, availableRows); + // this condition will be true if no data left in the current block/blocklet to be scanned + if (availableBatchRowCount < 1) { + break; + } + if (batchSize > availableRows) { + batchSize = batchSize - availableRows; + } else { + // this is done because in IUD cases actuals rows fetch can be less than batch size as + // some of the rows could have deleted. So in those cases batchSize need to be + // re initialized with left over value + batchSize = 0; + } + // for every iteration of available rows filling newly created list of Object[] and add it to + // the final list so there is no mismatch in the counter while filling dimension and + // measure data + List collectedData = new ArrayList<>(availableBatchRowCount); + // fill dimension data + fillDimensionData(scannedResult, collectedData, queryMeasures, availableBatchRowCount); + fillMeasureData(scannedResult, collectedData); + // increment the number of rows scanned in scanned result statistics + // incrementScannedResultRowCounter(scannedResult, availableBatchRowCount); + // assign the left over rows to batch size if the number of rows fetched are lesser + // than batchSize + if (collectedData.size() < availableBatchRowCount) { + batchSize += availableBatchRowCount - listBasedResult.size(); + } + // add the collected data to the final list + listBasedResult.addAll(collectedData); + } + } + + private void fillDimensionData(BlockletScannedResult scannedResult, + List listBasedResult, ProjectionMeasure[] queryMeasures, int batchSize) { + long startTime = System.currentTimeMillis(); + List dictionaryKeyArrayBatch = scannedResult.getDictionaryKeyArrayBatch(batchSize); + List noDictionaryKeyArrayBatch = + scannedResult.getNoDictionaryKeyArrayBatch(batchSize); + List complexTypeKeyArrayBatch = scannedResult.getComplexTypeKeyArrayBatch(batchSize); + // it will same for one blocklet so can be computed only once + byte[] implicitColumnByteArray = scannedResult.getBlockletId() + .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); + // Note: size check in for loop is for dictionaryKeyArrayBatch as this size can be lesser than + // batch size in case of IUD scenarios + for (int i = 0; i < dictionaryKeyArrayBatch.size(); i++) { + // 1 for ByteArrayWrapper object which will contain dictionary and no dictionary data + // 3 for blockletId, pageId, rowId + Object[] row = new Object[1 + queryMeasures.length + 3]; + scannedResult.incrementCounter(); + row[1 + queryMeasures.length] = scannedResult.getBlockletNumber(); + row[1 + queryMeasures.length + 1] = scannedResult.getCurrentPageCounter(); + ByteArrayWrapper wrapper = new ByteArrayWrapper(); + wrapper.setDictionaryKey(dictionaryKeyArrayBatch.get(i)); + wrapper.setNoDictionaryKeys(noDictionaryKeyArrayBatch.get(i)); + wrapper.setComplexTypesKeys(complexTypeKeyArrayBatch.get(i)); + wrapper.setImplicitColumnByteArray(implicitColumnByteArray); + row[0] = wrapper; + row[1 + queryMeasures.length + 2] = scannedResult.getCurrentRowId(); + listBasedResult.add(row); + } + QueryStatistic keyColumnFillingTime = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.KEY_COLUMN_FILLING_TIME); + keyColumnFillingTime.addCountStatistic(QueryStatisticsConstants.KEY_COLUMN_FILLING_TIME, + keyColumnFillingTime.getCount() + (System.currentTimeMillis() - startTime)); + } + + private void fillMeasureData(BlockletScannedResult scannedResult, + List listBasedResult) { + long startTime = System.currentTimeMillis(); + // if list is not empty after filling the dimension data then only fill the measure data + if (!listBasedResult.isEmpty()) { + fillMeasureDataBatch(listBasedResult, 1, scannedResult); + } + QueryStatistic measureFillingTime = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.MEASURE_FILLING_TIME); + measureFillingTime.addCountStatistic(QueryStatisticsConstants.MEASURE_FILLING_TIME, + measureFillingTime.getCount() + (System.currentTimeMillis() - startTime)); + } + + private void incrementScannedResultRowCounter(BlockletScannedResult scannedResult, + int batchSize) { + // increment row counter by batch size as those many number of rows have been processed at once + scannedResult.incrementCounter(batchSize); + } +} diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java new file mode 100644 index 00000000000..c160206a1b4 --- /dev/null +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.bloom; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.DataMapWriter; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.keygenerator.KeyGenerator; +import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.Predicate; +import org.apache.hadoop.util.bloom.CarbonBloomFilter; +import org.apache.hadoop.util.bloom.Key; +import org.apache.hadoop.util.hash.Hash; + +@InterfaceAudience.Internal +public abstract class AbstractBloomDataMapWriter extends DataMapWriter { + private static final LogService LOG = LogServiceFactory.getLogService( + BloomDataMapWriter.class.getCanonicalName()); + private int bloomFilterSize; + private double bloomFilterFpp; + private boolean compressBloom; + protected int currentBlockletId; + private List currentDMFiles; + private List currentDataOutStreams; + protected List indexBloomFilters; + private KeyGenerator keyGenerator; + private ColumnarSplitter columnarSplitter; + // for the dict/sort/date column, they are encoded in MDK, + // this maps the index column name to the index in MDK + private Map indexCol2MdkIdx; + // this gives the reverse map to indexCol2MdkIdx + private Map mdkIdx2IndexCol; + + AbstractBloomDataMapWriter(String tablePath, String dataMapName, List indexColumns, + Segment segment, String shardName, SegmentProperties segmentProperties, + int bloomFilterSize, double bloomFilterFpp, boolean compressBloom) + throws IOException { + super(tablePath, dataMapName, indexColumns, segment, shardName); + this.bloomFilterSize = bloomFilterSize; + this.bloomFilterFpp = bloomFilterFpp; + this.compressBloom = compressBloom; + currentDMFiles = new ArrayList<>(indexColumns.size()); + currentDataOutStreams = new ArrayList<>(indexColumns.size()); + indexBloomFilters = new ArrayList<>(indexColumns.size()); + initDataMapFile(); + resetBloomFilters(); + + keyGenerator = segmentProperties.getDimensionKeyGenerator(); + columnarSplitter = segmentProperties.getFixedLengthKeySplitter(); + this.indexCol2MdkIdx = new HashMap<>(); + this.mdkIdx2IndexCol = new HashMap<>(); + int idx = 0; + for (final CarbonDimension dimension : segmentProperties.getDimensions()) { + if (!dimension.isGlobalDictionaryEncoding() && !dimension.isDirectDictionaryEncoding()) { + continue; + } + boolean isExistInIndex = CollectionUtils.exists(indexColumns, new Predicate() { + @Override public boolean evaluate(Object object) { + return ((CarbonColumn) object).getColName().equalsIgnoreCase(dimension.getColName()); + } + }); + if (isExistInIndex) { + this.indexCol2MdkIdx.put(dimension.getColName(), idx); + this.mdkIdx2IndexCol.put(idx, dimension.getColName()); + } + idx++; + } + } + + @Override + public void onBlockStart(String blockId) throws IOException { + } + + @Override + public void onBlockEnd(String blockId) throws IOException { + } + + @Override + public void onBlockletStart(int blockletId) { + } + + protected void resetBloomFilters() { + indexBloomFilters.clear(); + List indexColumns = getIndexColumns(); + int[] stats = calculateBloomStats(); + for (int i = 0; i < indexColumns.size(); i++) { + indexBloomFilters + .add(new CarbonBloomFilter(stats[0], stats[1], Hash.MURMUR_HASH, compressBloom)); + } + } + + /** + * It calculates the bits size and number of hash functions to calculate bloom. + */ + private int[] calculateBloomStats() { + /* + * n: how many items you expect to have in your filter + * p: your acceptable false positive rate + * Number of bits (m) = -n*ln(p) / (ln(2)^2) + * Number of hashes(k) = m/n * ln(2) + */ + double sizeinBits = -bloomFilterSize * Math.log(bloomFilterFpp) / (Math.pow(Math.log(2), 2)); + double numberOfHashes = sizeinBits / bloomFilterSize * Math.log(2); + int[] stats = new int[2]; + stats[0] = (int) Math.ceil(sizeinBits); + stats[1] = (int) Math.ceil(numberOfHashes); + return stats; + } + + @Override + public void onBlockletEnd(int blockletId) { + writeBloomDataMapFile(); + currentBlockletId++; + } + + @Override + public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages) + throws IOException { + for (int rowId = 0; rowId < pageSize; rowId++) { + // for each indexed column, add the data to index + for (int i = 0; i < indexColumns.size(); i++) { + Object data = pages[i].getData(rowId); + addValue2BloomIndex(i, data); + } + } + } + + protected void addValue2BloomIndex(int indexColIdx, Object value) { + byte[] indexValue; + // convert measure to bytes + // convert non-dict dimensions to simple bytes without length + // convert internal-dict dimensions to simple bytes without any encode + if (indexColumns.get(indexColIdx).isMeasure()) { + if (value == null) { + value = DataConvertUtil.getNullValueForMeasure(indexColumns.get(indexColIdx).getDataType()); + } + indexValue = CarbonUtil.getValueAsBytes(indexColumns.get(indexColIdx).getDataType(), value); + } else { + if (indexColumns.get(indexColIdx).hasEncoding(Encoding.DICTIONARY) + || indexColumns.get(indexColIdx).hasEncoding(Encoding.DIRECT_DICTIONARY)) { + indexValue = convertDictionaryValue(indexColIdx, (byte[]) value); + } else { + indexValue = convertNonDictionaryValue(indexColIdx, (byte[]) value); + } + } + if (indexValue.length == 0) { + indexValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY; + } + indexBloomFilters.get(indexColIdx).add(new Key(indexValue)); + } + + protected byte[] convertDictionaryValue(int indexColIdx, byte[] value) { + byte[] fakeMdkBytes; + // this means that we need to pad some fake bytes + // to get the whole MDK in corresponding position + if (columnarSplitter.getBlockKeySize().length > indexCol2MdkIdx.size()) { + int totalSize = 0; + for (int size : columnarSplitter.getBlockKeySize()) { + totalSize += size; + } + fakeMdkBytes = new byte[totalSize]; + + // put this bytes to corresponding position + int thisKeyIdx = indexCol2MdkIdx.get(indexColumns.get(indexColIdx).getColName()); + int destPos = 0; + for (int keyIdx = 0; keyIdx < columnarSplitter.getBlockKeySize().length; keyIdx++) { + if (thisKeyIdx == keyIdx) { + System.arraycopy(value, 0, + fakeMdkBytes, destPos, columnarSplitter.getBlockKeySize()[thisKeyIdx]); + break; + } + destPos += columnarSplitter.getBlockKeySize()[keyIdx]; + } + } else { + fakeMdkBytes = value; + } + // for dict columns including dictionary and date columns + // decode value to get the surrogate key + int surrogateKey = (int) keyGenerator.getKey(fakeMdkBytes, + indexCol2MdkIdx.get(indexColumns.get(indexColIdx).getColName())); + // store the dictionary key in bloom + return CarbonUtil.getValueAsBytes(DataTypes.INT, surrogateKey); + } + + protected abstract byte[] convertNonDictionaryValue(int indexColIdx, byte[] value); + + private void initDataMapFile() throws IOException { + if (!FileFactory.isFileExist(dataMapPath)) { + if (!FileFactory.mkdirs(dataMapPath, FileFactory.getFileType(dataMapPath))) { + throw new IOException("Failed to create directory " + dataMapPath); + } + } + List indexColumns = getIndexColumns(); + for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) { + String dmFile = BloomCoarseGrainDataMap.getBloomIndexFile(dataMapPath, + indexColumns.get(indexColId).getColName()); + DataOutputStream dataOutStream = null; + try { + FileFactory.createNewFile(dmFile, FileFactory.getFileType(dmFile)); + dataOutStream = FileFactory.getDataOutputStream(dmFile, + FileFactory.getFileType(dmFile)); + } catch (IOException e) { + CarbonUtil.closeStreams(dataOutStream); + throw new IOException(e); + } + + this.currentDMFiles.add(dmFile); + this.currentDataOutStreams.add(dataOutStream); + } + } + + protected void writeBloomDataMapFile() { + List indexColumns = getIndexColumns(); + try { + for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) { + CarbonBloomFilter bloomFilter = indexBloomFilters.get(indexColId); + bloomFilter.setBlockletNo(currentBlockletId); + // only in higher version of guava-bloom-filter, it provides readFrom/writeTo interface. + // In lower version, we use default java serializer to write bloomfilter. + bloomFilter.write(this.currentDataOutStreams.get(indexColId)); + this.currentDataOutStreams.get(indexColId).flush(); + } + } catch (Exception e) { + for (DataOutputStream dataOutputStream : currentDataOutStreams) { + CarbonUtil.closeStreams(dataOutputStream); + } + throw new RuntimeException(e); + } finally { + resetBloomFilters(); + } + } + + @Override + public void finish() throws IOException { + if (!isWritingFinished()) { + releaseResouce(); + setWritingFinished(true); + } + } + + protected void releaseResouce() { + List indexColumns = getIndexColumns(); + for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) { + CarbonUtil.closeStreams( + currentDataOutStreams.get(indexColId)); + } + } + +} diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java index f7100e6b46e..f444ab54ee1 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java @@ -21,30 +21,22 @@ import java.util.List; import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.DataMapBuilder; import org.apache.carbondata.core.datastore.block.SegmentProperties; -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; -import org.apache.carbondata.core.util.CarbonUtil; - -import org.apache.hadoop.util.bloom.Key; /** * Implementation for BloomFilter DataMap to rebuild the datamap for main table with existing data */ @InterfaceAudience.Internal -public class BloomDataMapBuilder extends BloomDataMapWriter implements DataMapBuilder { +public class BloomDataMapBuilder extends AbstractBloomDataMapWriter implements DataMapBuilder { BloomDataMapBuilder(String tablePath, String dataMapName, List indexColumns, Segment segment, String shardName, SegmentProperties segmentProperties, int bloomFilterSize, double bloomFilterFpp, boolean bloomCompress) throws IOException { super(tablePath, dataMapName, indexColumns, segment, shardName, segmentProperties, bloomFilterSize, bloomFilterFpp, bloomCompress); - throw new RuntimeException( - "Deferred rebuild for bloomfilter datamap is currently not supported"); } @Override @@ -63,22 +55,15 @@ public void addRow(int blockletId, int pageId, int rowId, Object[] values) { List indexColumns = getIndexColumns(); for (int i = 0; i < indexColumns.size(); i++) { Object data = values[i]; - DataType dataType = indexColumns.get(i).getDataType(); - byte[] indexValue; - if (DataTypes.STRING == dataType) { - indexValue = getStringData(data); - } else if (DataTypes.BYTE_ARRAY == dataType) { - byte[] originValue = (byte[]) data; - // String and byte array is LV encoded, L is short type - indexValue = new byte[originValue.length - 2]; - System.arraycopy(originValue, 2, indexValue, 0, originValue.length - 2); - } else { - indexValue = CarbonUtil.getValueAsBytes(dataType, data); - } - indexBloomFilters.get(i).add(new Key(indexValue)); + addValue2BloomIndex(i, data); } } + @Override + protected byte[] convertNonDictionaryValue(int indexColIdx, byte[] value) { + return value; + } + @Override public void finish() throws IOException { if (!isWritingFinished()) { @@ -96,7 +81,7 @@ public void close() throws IOException { } @Override - protected byte[] getStringData(Object data) { - return ((String) data).getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS); + public boolean isIndexForCarbonRawBytes() { + return true; } } diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java index f72809ca159..27697738d85 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java @@ -16,36 +16,14 @@ */ package org.apache.carbondata.datamap.bloom; -import java.io.DataOutputStream; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.Segment; -import org.apache.carbondata.core.datamap.dev.DataMapWriter; import org.apache.carbondata.core.datastore.block.SegmentProperties; -import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.datastore.page.ColumnPage; -import org.apache.carbondata.core.keygenerator.KeyGenerator; -import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter; -import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; -import org.apache.carbondata.core.util.CarbonUtil; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.Predicate; -import org.apache.hadoop.util.bloom.CarbonBloomFilter; -import org.apache.hadoop.util.bloom.Key; -import org.apache.hadoop.util.hash.Hash; /** * BloomDataMap is constructed in CG level (blocklet level). @@ -55,231 +33,21 @@ * of bloom index file will be equal to that of the blocks. */ @InterfaceAudience.Internal -public class BloomDataMapWriter extends DataMapWriter { - private static final LogService LOG = LogServiceFactory.getLogService( - BloomDataMapWriter.class.getCanonicalName()); - private int bloomFilterSize; - private double bloomFilterFpp; - private boolean compressBloom; - protected int currentBlockletId; - private List currentDMFiles; - private List currentDataOutStreams; - protected List indexBloomFilters; - private KeyGenerator keyGenerator; - private ColumnarSplitter columnarSplitter; - // for the dict/sort/date column, they are encoded in MDK, - // this maps the index column name to the index in MDK - private Map indexCol2MdkIdx; - // this gives the reverse map to indexCol2MdkIdx - private Map mdkIdx2IndexCol; +public class BloomDataMapWriter extends AbstractBloomDataMapWriter { BloomDataMapWriter(String tablePath, String dataMapName, List indexColumns, Segment segment, String shardName, SegmentProperties segmentProperties, int bloomFilterSize, double bloomFilterFpp, boolean compressBloom) throws IOException { - super(tablePath, dataMapName, indexColumns, segment, shardName); - this.bloomFilterSize = bloomFilterSize; - this.bloomFilterFpp = bloomFilterFpp; - this.compressBloom = compressBloom; - currentDMFiles = new ArrayList<>(indexColumns.size()); - currentDataOutStreams = new ArrayList<>(indexColumns.size()); - indexBloomFilters = new ArrayList<>(indexColumns.size()); - initDataMapFile(); - resetBloomFilters(); - - keyGenerator = segmentProperties.getDimensionKeyGenerator(); - columnarSplitter = segmentProperties.getFixedLengthKeySplitter(); - this.indexCol2MdkIdx = new HashMap<>(); - this.mdkIdx2IndexCol = new HashMap<>(); - int idx = 0; - for (final CarbonDimension dimension : segmentProperties.getDimensions()) { - if (!dimension.isGlobalDictionaryEncoding() && !dimension.isDirectDictionaryEncoding()) { - continue; - } - boolean isExistInIndex = CollectionUtils.exists(indexColumns, new Predicate() { - @Override public boolean evaluate(Object object) { - return ((CarbonColumn) object).getColName().equalsIgnoreCase(dimension.getColName()); - } - }); - if (isExistInIndex) { - this.indexCol2MdkIdx.put(dimension.getColName(), idx); - this.mdkIdx2IndexCol.put(idx, dimension.getColName()); - } - idx++; - } - } - - @Override - public void onBlockStart(String blockId) throws IOException { - } - - @Override - public void onBlockEnd(String blockId) throws IOException { - } - - @Override - public void onBlockletStart(int blockletId) { + super(tablePath, dataMapName, indexColumns, segment, shardName, segmentProperties, + bloomFilterSize, bloomFilterFpp, compressBloom); } - protected void resetBloomFilters() { - indexBloomFilters.clear(); - List indexColumns = getIndexColumns(); - int[] stats = calculateBloomStats(); - for (int i = 0; i < indexColumns.size(); i++) { - indexBloomFilters - .add(new CarbonBloomFilter(stats[0], stats[1], Hash.MURMUR_HASH, compressBloom)); + protected byte[] convertNonDictionaryValue(int indexColIdx, byte[] value) { + if (DataTypes.VARCHAR == indexColumns.get(indexColIdx).getDataType()) { + return DataConvertUtil.getRawBytesForVarchar(value); + } else { + return DataConvertUtil.getRawBytes(value); } } - - /** - * It calculates the bits size and number of hash functions to calculate bloom. - */ - private int[] calculateBloomStats() { - /* - * n: how many items you expect to have in your filter - * p: your acceptable false positive rate - * Number of bits (m) = -n*ln(p) / (ln(2)^2) - * Number of hashes(k) = m/n * ln(2) - */ - double sizeinBits = -bloomFilterSize * Math.log(bloomFilterFpp) / (Math.pow(Math.log(2), 2)); - double numberOfHashes = sizeinBits / bloomFilterSize * Math.log(2); - int[] stats = new int[2]; - stats[0] = (int) Math.ceil(sizeinBits); - stats[1] = (int) Math.ceil(numberOfHashes); - return stats; - } - - @Override - public void onBlockletEnd(int blockletId) { - writeBloomDataMapFile(); - currentBlockletId++; - } - - @Override - public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages) { - for (int rowId = 0; rowId < pageSize; rowId++) { - // for each indexed column, add the data to bloom filter - for (int i = 0; i < indexColumns.size(); i++) { - Object data = pages[i].getData(rowId); - DataType dataType = indexColumns.get(i).getDataType(); - byte[] indexValue; - // convert measure to bytes - // convert non-dict dimensions to simple bytes without length - // convert internal-dict dimensions to simple bytes without any encode - if (indexColumns.get(i).isMeasure()) { - indexValue = CarbonUtil.getValueAsBytes(dataType, data); - } else { - if (indexColumns.get(i).hasEncoding(Encoding.DICTIONARY) - || indexColumns.get(i).hasEncoding(Encoding.DIRECT_DICTIONARY)) { - byte[] mdkBytes; - // this means that we need to pad some fake bytes - // to get the whole MDK in corresponding position - if (columnarSplitter.getBlockKeySize().length > indexCol2MdkIdx.size()) { - int totalSize = 0; - for (int size : columnarSplitter.getBlockKeySize()) { - totalSize += size; - } - mdkBytes = new byte[totalSize]; - int startPos = 0; - int destPos = 0; - for (int keyIdx = 0; keyIdx < columnarSplitter.getBlockKeySize().length; keyIdx++) { - if (mdkIdx2IndexCol.containsKey(keyIdx)) { - int size = columnarSplitter.getBlockKeySize()[keyIdx]; - System.arraycopy(data, startPos, mdkBytes, destPos, size); - startPos += size; - } - destPos += columnarSplitter.getBlockKeySize()[keyIdx]; - } - } else { - mdkBytes = (byte[]) data; - } - // for dict columns including dictionary and date columns - // decode value to get the surrogate key - int surrogateKey = (int) keyGenerator.getKey(mdkBytes, - indexCol2MdkIdx.get(indexColumns.get(i).getColName())); - // store the dictionary key in bloom - indexValue = CarbonUtil.getValueAsBytes(DataTypes.INT, surrogateKey); - } else if (DataTypes.VARCHAR == dataType) { - indexValue = DataConvertUtil.getRawBytesForVarchar((byte[]) data); - } else { - indexValue = DataConvertUtil.getRawBytes((byte[]) data); - } - } - if (indexValue.length == 0) { - indexValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY; - } - indexBloomFilters.get(i).add(new Key(indexValue)); - } - } - } - - protected byte[] getStringData(Object data) { - byte[] lvData = (byte[]) data; - byte[] indexValue = new byte[lvData.length - 2]; - System.arraycopy(lvData, 2, indexValue, 0, lvData.length - 2); - return indexValue; - } - - private void initDataMapFile() throws IOException { - if (!FileFactory.isFileExist(dataMapPath)) { - if (!FileFactory.mkdirs(dataMapPath, FileFactory.getFileType(dataMapPath))) { - throw new IOException("Failed to create directory " + dataMapPath); - } - } - List indexColumns = getIndexColumns(); - for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) { - String dmFile = BloomCoarseGrainDataMap.getBloomIndexFile(dataMapPath, - indexColumns.get(indexColId).getColName()); - DataOutputStream dataOutStream = null; - try { - FileFactory.createNewFile(dmFile, FileFactory.getFileType(dmFile)); - dataOutStream = FileFactory.getDataOutputStream(dmFile, - FileFactory.getFileType(dmFile)); - } catch (IOException e) { - CarbonUtil.closeStreams(dataOutStream); - throw new IOException(e); - } - - this.currentDMFiles.add(dmFile); - this.currentDataOutStreams.add(dataOutStream); - } - } - - protected void writeBloomDataMapFile() { - List indexColumns = getIndexColumns(); - try { - for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) { - CarbonBloomFilter bloomFilter = indexBloomFilters.get(indexColId); - bloomFilter.setBlockletNo(currentBlockletId); - // only in higher version of guava-bloom-filter, it provides readFrom/writeTo interface. - // In lower version, we use default java serializer to write bloomfilter. - bloomFilter.write(this.currentDataOutStreams.get(indexColId)); - this.currentDataOutStreams.get(indexColId).flush(); - } - } catch (Exception e) { - for (DataOutputStream dataOutputStream : currentDataOutStreams) { - CarbonUtil.closeStreams(dataOutputStream); - } - throw new RuntimeException(e); - } finally { - resetBloomFilters(); - } - } - - @Override - public void finish() throws IOException { - if (!isWritingFinished()) { - releaseResouce(); - setWritingFinished(true); - } - } - - protected void releaseResouce() { - List indexColumns = getIndexColumns(); - for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) { - CarbonUtil.closeStreams( - currentDataOutStreams.get(indexColId)); - } - } - } diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapBuilder.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapBuilder.java index eb702201362..7081fa480dd 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapBuilder.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapBuilder.java @@ -172,4 +172,8 @@ public void close() throws IOException { } } + @Override + public boolean isIndexForCarbonRawBytes() { + return false; + } } diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala index 0cafe337fda..c615078e6d3 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala @@ -293,6 +293,14 @@ class TestDataMapFactory( override def finish(): Unit = { } override def close(): Unit = { } + + /** + * whether create index on internal carbon bytes (such as dictionary encoded) or original + * value + */ + override def isIndexForCarbonRawBytes: Boolean = { + false + } } } } diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala index c1c20d837a5..05f472fa780 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala @@ -33,13 +33,17 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} import org.apache.carbondata.core.datamap.{DataMapRegistry, DataMapStoreManager, Segment} import org.apache.carbondata.core.datamap.dev.DataMapBuilder +import org.apache.carbondata.core.datastore.block.SegmentProperties import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory +import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher +import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes} import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, TableInfo} import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn +import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.TaskMetricsMap import org.apache.carbondata.core.util.path.CarbonTablePath @@ -78,7 +82,7 @@ object IndexDataMapRebuildRDD { validSegments.asScala.foreach { segment => // if lucene datamap folder is exists, not require to build lucene datamap again refreshOneSegment(sparkSession, carbonTable, schema.getDataMapName, - indexedCarbonColumns, segment.getSegmentNo); + indexedCarbonColumns, segment); } val buildDataMapPostExecutionEvent = new BuildDataMapPostExecutionEvent(sparkSession, tableIdentifier) @@ -90,10 +94,10 @@ object IndexDataMapRebuildRDD { carbonTable: CarbonTable, dataMapName: String, indexColumns: java.util.List[CarbonColumn], - segmentId: String): Unit = { + segment: Segment): Unit = { - val dataMapStorePath = - CarbonTablePath.getDataMapStorePath(carbonTable.getTablePath, segmentId, dataMapName) + val dataMapStorePath = CarbonTablePath.getDataMapStorePath(carbonTable.getTablePath, + segment.getSegmentNo, dataMapName) if (!FileFactory.isFileExist(dataMapStorePath)) { if (FileFactory.mkdirs(dataMapStorePath, FileFactory.getFileType(dataMapStorePath))) { @@ -104,19 +108,19 @@ object IndexDataMapRebuildRDD { carbonTable.getTableInfo, dataMapName, indexColumns.asScala.toArray, - segmentId + segment ).collect() status.find(_._2 == false).foreach { task => throw new Exception( - s"Task Failed to rebuild datamap $dataMapName on segment_$segmentId") + s"Task Failed to rebuild datamap $dataMapName on segment_${segment.getSegmentNo}") } } catch { case ex: Throwable => // process failure FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(dataMapStorePath)) throw new Exception( - s"Failed to refresh datamap $dataMapName on segment_$segmentId", ex) + s"Failed to refresh datamap $dataMapName on segment_${segment.getSegmentNo}", ex) } } else { throw new IOException(s"Failed to create directory $dataMapStorePath") @@ -144,13 +148,113 @@ class OriginalReadSupport(dataTypes: Array[DataType]) extends CarbonReadSupport[ } } +/** + * This class will generate row value which is raw bytes for the dimensions. + */ +class RawBytesReadSupport(segmentProperties: SegmentProperties, indexColumns: Array[CarbonColumn]) + extends CarbonReadSupport[Array[Object]] { + var columnarSplitter: ColumnarSplitter = _ + // for the non dictionary dimensions + var indexCol2IdxInNoDictArray: Map[String, Int] = Map() + // for the measures + var indexCol2IdxInMeasureArray: Map[String, Int] = Map() + // for the dictionary/date dimensions + var dictIndexCol2MdkIndex: Map[String, Int] = Map() + var mdkIndex2DictIndexCol: Map[Int, String] = Map() + var existDim = false + + override def initialize(carbonColumns: Array[CarbonColumn], + carbonTable: CarbonTable): Unit = { + this.columnarSplitter = segmentProperties.getFixedLengthKeySplitter + + indexColumns.foreach { col => + if (col.isDimension) { + val dim = carbonTable.getDimensionByName(carbonTable.getTableName, col.getColName) + if (!dim.isGlobalDictionaryEncoding && !dim.isDirectDictionaryEncoding) { + indexCol2IdxInNoDictArray = + indexCol2IdxInNoDictArray + (col.getColName -> indexCol2IdxInNoDictArray.size) + } + } else { + indexCol2IdxInMeasureArray = + indexCol2IdxInMeasureArray + (col.getColName -> indexCol2IdxInMeasureArray.size) + } + } + dictIndexCol2MdkIndex = segmentProperties.getDimensions.asScala + .filter(col => col.isGlobalDictionaryEncoding || col.isDirectDictionaryEncoding) + .map(_.getColName) + .zipWithIndex + .filter(p => indexColumns.exists(c => c.getColName.equalsIgnoreCase(p._1))) + .toMap + mdkIndex2DictIndexCol = dictIndexCol2MdkIndex.map(p => (p._2, p._1)) + existDim = indexCol2IdxInNoDictArray.nonEmpty || dictIndexCol2MdkIndex.nonEmpty + } + + /** + * input: all the dimensions are bundled in one ByteArrayWrapper in position 0, + * then comes the measures one by one; + * output: all the dimensions and measures comes one after another + */ + override def readRow(data: Array[Object]): Array[Object] = { + val dictArray = if (existDim) { + val dictKeys = data(0).asInstanceOf[ByteArrayWrapper].getDictionaryKey + // note that the index column may only contains a portion of all the dict columns, so we + // need to pad fake bytes to dict keys in order to reconstruct value later + if (columnarSplitter.getBlockKeySize.length > dictIndexCol2MdkIndex.size) { + val res = new Array[Byte](columnarSplitter.getBlockKeySize.sum) + var startPos = 0 + var desPos = 0 + columnarSplitter.getBlockKeySize.indices.foreach { idx => + if (mdkIndex2DictIndexCol.contains(idx)) { + val size = columnarSplitter.getBlockKeySize.apply(idx) + System.arraycopy(dictKeys, startPos, res, desPos, size) + startPos += size + } + desPos += columnarSplitter.getBlockKeySize.apply(idx) + } + + Option(res) + } else { + Option(dictKeys) + } + } else { + None + } + + val dictKeys = if (existDim) { + Option(columnarSplitter.splitKey(dictArray.get)) + } else { + None + } + val rtn = new Array[Object](indexColumns.length + 3) + + indexColumns.zipWithIndex.foreach { case (col, i) => + rtn(i) = if (dictIndexCol2MdkIndex.contains(col.getColName)) { + dictKeys.get(dictIndexCol2MdkIndex.get(col.getColName).get) + } else if (indexCol2IdxInNoDictArray.contains(col.getColName)) { + data(0).asInstanceOf[ByteArrayWrapper].getNoDictionaryKeyByIndex( + indexCol2IdxInNoDictArray.apply(col.getColName)) + } else { + // measures start from 1 + data(1 + indexCol2IdxInMeasureArray.apply(col.getColName)) + } + } + rtn(indexColumns.length) = data(data.length - 3) + rtn(indexColumns.length + 1) = data(data.length - 2) + rtn(indexColumns.length + 2) = data(data.length - 1) + rtn + } + + override def close(): Unit = { + } +} + class IndexDataMapRebuildRDD[K, V]( session: SparkSession, result: RefreshResult[K, V], @transient tableInfo: TableInfo, dataMapName: String, indexColumns: Array[CarbonColumn], - segmentId: String + segment: Segment ) extends CarbonRDDWithTableInfo[(K, V)]( session.sparkContext, Nil, tableInfo.serialize()) { @@ -163,9 +267,9 @@ class IndexDataMapRebuildRDD[K, V]( override def internalCompute(split: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - val dataMapFactory = - DataMapManager.get().getDataMapProvider( - CarbonTable.buildFromTableInfo(getTableInfo), dataMapSchema, session).getDataMapFactory + val carbonTable = CarbonTable.buildFromTableInfo(getTableInfo) + val dataMapFactory = DataMapManager.get().getDataMapProvider( + carbonTable, dataMapSchema, session).getDataMapFactory var status = false val inputMetrics = new CarbonInputMetrics TaskMetricsMap.getInstance().registerThreadCallback() @@ -180,21 +284,31 @@ class IndexDataMapRebuildRDD[K, V]( // one query id per table model.setQueryId(queryId) model.setVectorReader(false) - model.setForcedDetailRawQuery(false) model.setRequiredRowId(true) var reader: CarbonRecordReader[Array[Object]] = null var refresher: DataMapBuilder = null try { - reader = new CarbonRecordReader( - model, new OriginalReadSupport(indexColumns.map(_.getDataType)), inputMetrics) - reader.initialize(inputSplit, attemptContext) + val segmentPropertiesFetcher = DataMapStoreManager.getInstance().getDataMap(carbonTable, + BlockletDataMapFactory.DATA_MAP_SCHEMA) + .getDataMapFactory + .asInstanceOf[SegmentPropertiesFetcher] + val segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment) // we use task name as shard name to create the folder for this datamap val shardName = CarbonTablePath.getShardName(inputSplit.getAllSplits.get(0).getBlockPath) - refresher = dataMapFactory.createBuilder(new Segment(segmentId), shardName, null) + refresher = dataMapFactory.createBuilder(segment, shardName, segmentProperties) refresher.initialize() + model.setForcedDetailRawQuery(refresher.isIndexForCarbonRawBytes) + val readSupport = if (refresher.isIndexForCarbonRawBytes) { + new RawBytesReadSupport(segmentProperties, indexColumns) + } else { + new OriginalReadSupport(indexColumns.map(_.getDataType)) + } + reader = new CarbonRecordReader[Array[Object]](model, readSupport, inputMetrics) + reader.initialize(inputSplit, attemptContext) + var blockletId = 0 var firstRow = true while (reader.nextKeyValue()) { @@ -269,7 +383,7 @@ class IndexDataMapRebuildRDD[K, V]( CarbonInputFormat.setSegmentsToAccess( conf, - Segment.toSegmentList(Array(segmentId), null)) + Segment.toSegmentList(Array(segment.getSegmentNo), null)) CarbonInputFormat.setColumnProjection( conf, @@ -291,7 +405,7 @@ class IndexDataMapRebuildRDD[K, V]( CarbonInputFormat.setSegmentsToAccess( job.getConfiguration, - Segment.toSegmentList(Array(segmentId), null)) + Segment.toSegmentList(Array(segment.getSegmentNo), null)) CarbonInputFormat.setTableInfo( job.getConfiguration, diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala index 9e19ac23032..d0e2cbdd8ca 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala @@ -454,6 +454,311 @@ class BloomCoarseGrainDataMapFunctionSuite extends QueryTest with BeforeAndAfte sql(s"SELECT * FROM $normalTable WHERE c3 = 'xxx'")) } + test("test rebuild bloom datamap: index column is integer, dictionary, sort_column") { + sql( + s""" + | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT, + | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING) + | STORED BY 'carbondata' + | TBLPROPERTIES('table_blocksize'='128') + | """.stripMargin) + sql( + s""" + | CREATE TABLE $bloomDMSampleTable(id INT, name STRING, city STRING, age INT, + | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING) + | STORED BY 'carbondata' + | TBLPROPERTIES('table_blocksize'='128', 'dictionary_include'='id, name, s1', 'sort_columns'='id') + | """.stripMargin) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $normalTable + | OPTIONS('header'='false') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $bloomDMSampleTable + | OPTIONS('header'='false') + """.stripMargin) + + sql( + s""" + | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable + | USING 'bloomfilter' + | DMProperties('INDEX_COLUMNS'='city,id,age,name', 'BLOOM_SIZE'='640000') + """.stripMargin) + + sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable").show(false) + checkExistence(sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable"), true, dataMapName) + checkBasicQuery(dataMapName, bloomDMSampleTable, normalTable) + sql(s"DROP TABLE IF EXISTS $normalTable") + sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable") + } + + test("test rebuild bloom datamap: index column is integer, dictionary, not sort_column") { + sql( + s""" + | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT, + | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING) + | STORED BY 'carbondata' + | TBLPROPERTIES('table_blocksize'='128') + | """.stripMargin) + sql( + s""" + | CREATE TABLE $bloomDMSampleTable(id INT, name STRING, city STRING, age INT, + | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING) + | STORED BY 'carbondata' + | TBLPROPERTIES('table_blocksize'='128', 'dictionary_include'='id, name, s1', 'sort_columns'='name') + | """.stripMargin) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $normalTable + | OPTIONS('header'='false') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $bloomDMSampleTable + | OPTIONS('header'='false') + """.stripMargin) + + sql( + s""" + | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable + | USING 'bloomfilter' + | DMProperties('INDEX_COLUMNS'='city,id,age,name', 'BLOOM_SIZE'='640000') + """.stripMargin) + + sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable").show(false) + checkExistence(sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable"), true, dataMapName) + checkBasicQuery(dataMapName, bloomDMSampleTable, normalTable) + sql(s"DROP TABLE IF EXISTS $normalTable") + sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable") + } + + test("test rebuild bloom datamap: index column is integer, sort_column") { + sql( + s""" + | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT, + | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING) + | STORED BY 'carbondata' + | TBLPROPERTIES('table_blocksize'='128') + | """.stripMargin) + sql( + s""" + | CREATE TABLE $bloomDMSampleTable(id INT, name STRING, city STRING, age INT, + | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING) + | STORED BY 'carbondata' + | TBLPROPERTIES('table_blocksize'='128', 'dictionary_include'='name, s1', 'sort_columns'='id') + | """.stripMargin) + + sql( + s""" + | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $normalTable + | OPTIONS('header'='false') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$bigFile' INTO TABLE $bloomDMSampleTable + | OPTIONS('header'='false') + """.stripMargin) + + sql( + s""" + | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable + | USING 'bloomfilter' + | DMProperties('INDEX_COLUMNS'='city,id,age,name', 'BLOOM_SIZE'='640000') + """.stripMargin) + + sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable").show(false) + checkExistence(sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable"), true, dataMapName) + checkBasicQuery(dataMapName, bloomDMSampleTable, normalTable) + sql(s"DROP TABLE IF EXISTS $normalTable") + sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable") + } + + test("test rebuild bloom datamap: index column is float, not dictionary") { + val floatCsvPath = s"$resourcesPath/datasamplefordate.csv" + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd") + sql( + s""" + | CREATE TABLE $normalTable(empno string, doj date, salary float) + | STORED BY 'carbondata' + | TBLPROPERTIES('SORT_COLUMNS'='empno') + """.stripMargin) + sql( + s""" + | CREATE TABLE $bloomDMSampleTable(empno string, doj date, salary float) + | STORED BY 'carbondata' + | TBLPROPERTIES('SORT_COLUMNS'='empno') + """.stripMargin) + sql( + s""" + | LOAD DATA INPATH '$floatCsvPath' INTO TABLE $normalTable OPTIONS( + | 'DELIMITER'=',', 'QUOTECHAR'='"', 'BAD_RECORDS_ACTION'='FORCE') + """.stripMargin) + sql( + s""" + | LOAD DATA INPATH '$floatCsvPath' INTO TABLE $bloomDMSampleTable OPTIONS( + | 'DELIMITER'=',', 'QUOTECHAR'='"', 'BAD_RECORDS_ACTION'='FORCE') + """.stripMargin) + sql( + s""" + | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable USING 'bloomfilter' DMPROPERTIES ( + | 'INDEX_COLUMNS'='salary') + """.stripMargin) + sql(s"SELECT * FROM $bloomDMSampleTable WHERE salary='1040.56'").show(false) + sql(s"SELECT * FROM $bloomDMSampleTable WHERE salary='1040'").show(false) + checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE salary='1040.56'"), + sql(s"SELECT * FROM $normalTable WHERE salary='1040.56'")) + checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE salary='1040'"), + sql(s"SELECT * FROM $normalTable WHERE salary='1040'")) + } + + test("test rebuild bloom datamap: index column is float, dictionary") { + val floatCsvPath = s"$resourcesPath/datasamplefordate.csv" + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd") + sql( + s""" + | CREATE TABLE $normalTable(empno string, doj date, salary float) + | STORED BY 'carbondata' + | TBLPROPERTIES('SORT_COLUMNS'='empno') + """.stripMargin) + sql( + s""" + | CREATE TABLE $bloomDMSampleTable(empno string, doj date, salary float) + | STORED BY 'carbondata' + | TBLPROPERTIES('SORT_COLUMNS'='empno', 'DICTIONARY_INCLUDE'='salary') + """.stripMargin) + sql( + s""" + | LOAD DATA INPATH '$floatCsvPath' INTO TABLE $normalTable OPTIONS( + | 'DELIMITER'=',', 'QUOTECHAR'='"', 'BAD_RECORDS_ACTION'='FORCE') + """.stripMargin) + sql( + s""" + | LOAD DATA INPATH '$floatCsvPath' INTO TABLE $bloomDMSampleTable OPTIONS( + | 'DELIMITER'=',', 'QUOTECHAR'='"', 'BAD_RECORDS_ACTION'='FORCE') + """.stripMargin) + sql( + s""" + | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable USING 'bloomfilter' DMPROPERTIES ( + | 'INDEX_COLUMNS'='salary') + """.stripMargin) + sql(s"SELECT * FROM $bloomDMSampleTable WHERE salary='1040.56'").show(false) + sql(s"SELECT * FROM $bloomDMSampleTable WHERE salary='1040'").show(false) + checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE salary='1040.56'"), + sql(s"SELECT * FROM $normalTable WHERE salary='1040.56'")) + checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE salary='1040'"), + sql(s"SELECT * FROM $normalTable WHERE salary='1040'")) + } + + test("test rebuild bloom datamap: index column is date") { + val dateCsvPath = s"$resourcesPath/datasamplefordate.csv" + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd") + sql( + s""" + | CREATE TABLE $normalTable(empno string, doj date, salary float) + | STORED BY 'carbondata' + | TBLPROPERTIES('SORT_COLUMNS'='empno') + """.stripMargin) + sql( + s""" + | CREATE TABLE $bloomDMSampleTable(empno string, doj date, salary float) + | STORED BY 'carbondata' + | TBLPROPERTIES('SORT_COLUMNS'='empno') + """.stripMargin) + sql( + s""" + | LOAD DATA INPATH '$dateCsvPath' INTO TABLE $normalTable OPTIONS( + | 'DELIMITER'=',', 'QUOTECHAR'='"', 'BAD_RECORDS_ACTION'='FORCE') + """.stripMargin) + sql( + s""" + | LOAD DATA INPATH '$dateCsvPath' INTO TABLE $bloomDMSampleTable OPTIONS( + | 'DELIMITER'=',', 'QUOTECHAR'='"', 'BAD_RECORDS_ACTION'='FORCE') + """.stripMargin) + sql( + s""" + | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable USING 'bloomfilter' DMPROPERTIES ( + | 'INDEX_COLUMNS'='doj') + """.stripMargin) + sql(s"SELECT * FROM $bloomDMSampleTable WHERE doj='2016-03-14'").show(false) + sql(s"SELECT * FROM $bloomDMSampleTable WHERE doj='2016-03-15'").show(false) + checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE doj='2016-03-14'"), + sql(s"SELECT * FROM $normalTable WHERE doj='2016-03-14'")) + checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE doj='2016-03-15'"), + sql(s"SELECT * FROM $normalTable WHERE doj='2016-03-15'")) + } + + test("test rebuild bloom datamap: index column is date, dictionary, sort_colum") { + val dateCsvPath = s"$resourcesPath/datasamplefordate.csv" + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd") + sql( + s""" + | CREATE TABLE $normalTable(empno string, doj date, salary float) + | STORED BY 'carbondata' + | TBLPROPERTIES('SORT_COLUMNS'='empno') + """.stripMargin) + sql( + s""" + | CREATE TABLE $bloomDMSampleTable(empno string, doj date, salary float) + | STORED BY 'carbondata' + | TBLPROPERTIES('SORT_COLUMNS'='empno,doj', 'DICTIONARY_INCLUDE'='doj') + """.stripMargin) + sql( + s""" + | LOAD DATA INPATH '$dateCsvPath' INTO TABLE $normalTable OPTIONS( + | 'DELIMITER'=',', 'QUOTECHAR'='"', 'BAD_RECORDS_ACTION'='FORCE') + """.stripMargin) + sql( + s""" + | LOAD DATA INPATH '$dateCsvPath' INTO TABLE $bloomDMSampleTable OPTIONS( + | 'DELIMITER'=',', 'QUOTECHAR'='"', 'BAD_RECORDS_ACTION'='FORCE') + """.stripMargin) + sql( + s""" + | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable USING 'bloomfilter' DMPROPERTIES ( + | 'INDEX_COLUMNS'='doj') + """.stripMargin) + sql(s"SELECT * FROM $bloomDMSampleTable WHERE doj='2016-03-14'").show(false) + sql(s"SELECT * FROM $bloomDMSampleTable WHERE doj='2016-03-15'").show(false) + checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE doj='2016-03-14'"), + sql(s"SELECT * FROM $normalTable WHERE doj='2016-03-14'")) + checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE doj='2016-03-15'"), + sql(s"SELECT * FROM $normalTable WHERE doj='2016-03-15'")) + } + + ignore("test rebuild bloom datamap: loading and querying with empty values on index column") { + sql(s"CREATE TABLE $normalTable(c1 string, c2 int, c3 string) STORED BY 'carbondata'") + sql(s"CREATE TABLE $bloomDMSampleTable(c1 string, c2 int, c3 string) STORED BY 'carbondata'") + + // load data with empty value + sql(s"INSERT INTO $normalTable SELECT '', 1, 'xxx'") + sql(s"INSERT INTO $bloomDMSampleTable SELECT '', 1, 'xxx'") + sql(s"INSERT INTO $normalTable SELECT '', null, 'xxx'") + sql(s"INSERT INTO $bloomDMSampleTable SELECT '', null, 'xxx'") + + sql( + s""" + | CREATE DATAMAP $dataMapName on table $bloomDMSampleTable + | using 'bloomfilter' + | DMPROPERTIES('index_columns'='c1, c2') + """.stripMargin) + + // query on null fields + checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable"), + sql(s"SELECT * FROM $normalTable")) + checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE c1 = null"), + sql(s"SELECT * FROM $normalTable WHERE c1 = null")) + checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE c1 = ''"), + sql(s"SELECT * FROM $normalTable WHERE c1 = ''")) + checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE isNull(c1)"), + sql(s"SELECT * FROM $normalTable WHERE isNull(c1)")) + checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE isNull(c2)"), + sql(s"SELECT * FROM $normalTable WHERE isNull(c2)")) + } + override def afterAll(): Unit = { deleteFile(bigFile) sql(s"DROP TABLE IF EXISTS $normalTable") diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala index 836c8e73ba7..14f89668f12 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala @@ -139,7 +139,7 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable") } - ignore("test create bloom datamap and REBUILD DATAMAP") { + test("test create bloom datamap and REBUILD DATAMAP") { sql( s""" | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT, @@ -181,7 +181,7 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable") } - ignore("test create bloom datamap with DEFERRED REBUILD, query hit datamap") { + test("test create bloom datamap with DEFERRED REBUILD, query hit datamap") { sql( s""" | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT, @@ -259,7 +259,7 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable") } - ignore("test create bloom datamap with DEFERRED REBUILD, query not hit datamap") { + test("test create bloom datamap with DEFERRED REBUILD, query not hit datamap") { sql( s""" | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT,