From f29ec1d80acea6fcb85a55ab37c02847e9282e5b Mon Sep 17 00:00:00 2001 From: xuchuanyin Date: Wed, 29 Aug 2018 20:26:38 +0800 Subject: [PATCH] Fix bugs in MinMaxDataMap make minmax datamap useable and add more tests for it --- .../page/statistics/DummyStatsCollector.java | 4 + .../statistics/KeyPageStatsCollector.java | 5 + .../statistics/LVStringStatsCollector.java | 5 + .../PrimitivePageStatsCollector.java | 8 + .../page/statistics/SimpleStatsResult.java | 4 + .../datamap/examples/MinMaxDataWriter.java | 286 ------------ .../examples/MinMaxIndexBlockDetails.java | 64 --- .../datamap/examples/MinMaxIndexDataMap.java | 178 -------- .../examples/MinMaxIndexDataMapFactory.java | 171 -------- .../minmax/AbstractMinMaxDataMapWriter.java | 231 ++++++++++ .../datamap/minmax/MinMaxDataMapBuilder.java | 101 +++++ .../datamap/minmax/MinMaxDataMapCache.java | 142 ++++++ .../minmax/MinMaxDataMapCacheKeyValue.java | 82 ++++ .../minmax/MinMaxDataMapDirectWriter.java | 51 +++ .../MinMaxDataMapDistributable.java} | 30 +- .../datamap/minmax/MinMaxDataMapFactory.java | 367 ++++++++++++++++ .../datamap/minmax/MinMaxDataMapModel.java | 35 ++ .../datamap/minmax/MinMaxIndexDataMap.java | 122 ++++++ .../datamap/minmax/MinMaxIndexHolder.java | 150 +++++++ .../minmax/MinMaxDataMapFunctionSuite.scala | 408 ++++++++++++++++++ .../datamap/minmax/MinMaxDataMapSuite.scala | 8 +- 21 files changed, 1731 insertions(+), 721 deletions(-) delete mode 100644 datamap/examples/src/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java delete mode 100644 datamap/examples/src/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java delete mode 100644 datamap/examples/src/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java delete mode 100644 datamap/examples/src/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java create mode 100644 datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/AbstractMinMaxDataMapWriter.java create mode 100644 datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapBuilder.java create mode 100644 datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapCache.java create mode 100644 datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapCacheKeyValue.java create mode 100644 datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapDirectWriter.java rename datamap/examples/src/main/java/org/apache/carbondata/datamap/{examples/BlockletMinMax.java => minmax/MinMaxDataMapDistributable.java} (62%) create mode 100644 datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapFactory.java create mode 100644 datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapModel.java create mode 100644 datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxIndexDataMap.java create mode 100644 datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxIndexHolder.java create mode 100644 integration/spark2/src/test/scala/org/apache/carbondata/datamap/minmax/MinMaxDataMapFunctionSuite.scala diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/DummyStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/DummyStatsCollector.java index a8bc5f16b79..d9b532f2e40 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/DummyStatsCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/DummyStatsCollector.java @@ -48,6 +48,10 @@ public class DummyStatsCollector implements ColumnPageStatsCollector { return BYTE_ARRAY; } + @Override + public void clear() { + + } }; @Override public void updateNull(int rowId) { diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/KeyPageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/KeyPageStatsCollector.java index 22537db228f..28320504000 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/KeyPageStatsCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/KeyPageStatsCollector.java @@ -107,6 +107,11 @@ public SimpleStatsResult getPageStats() { return dataType; } + @Override + public void clear() { + min = null; + max = null; + } }; } } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java index e1ac6769802..8a35cac513a 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java @@ -105,6 +105,11 @@ public SimpleStatsResult getPageStats() { return DataTypes.STRING; } + @Override + public void clear() { + min = null; + max = null; + } }; } } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java index bbac772f622..2ab04203359 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java @@ -120,6 +120,10 @@ public static PrimitivePageStatsCollector newInstance(ValueEncoderMeta meta) { private PrimitivePageStatsCollector(DataType dataType) { this.dataType = dataType; + resetPageStats(); + } + + private void resetPageStats() { if (dataType == DataTypes.BOOLEAN) { minByte = TRUE_VALUE; maxByte = FALSE_VALUE; @@ -343,4 +347,8 @@ public DataType getDataType() { return dataType; } + @Override + public void clear() { + resetPageStats(); + } } \ No newline at end of file diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/SimpleStatsResult.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/SimpleStatsResult.java index 0e1f6503856..01d883b1ce1 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/SimpleStatsResult.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/SimpleStatsResult.java @@ -29,4 +29,8 @@ public interface SimpleStatsResult { DataType getDataType(); + /** + * clear the statistics info for resuse + */ + void clear(); } diff --git a/datamap/examples/src/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java b/datamap/examples/src/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java deleted file mode 100644 index 09932184898..00000000000 --- a/datamap/examples/src/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java +++ /dev/null @@ -1,286 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.datamap.examples; - -import java.io.BufferedWriter; -import java.io.DataOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datamap.Segment; -import org.apache.carbondata.core.datamap.dev.DataMapWriter; -import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.datastore.page.ColumnPage; -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; -import org.apache.carbondata.core.metadata.schema.table.TableInfo; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; -import org.apache.carbondata.core.util.ByteUtil; -import org.apache.carbondata.core.util.CarbonUtil; - -import com.google.gson.Gson; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -public class MinMaxDataWriter extends DataMapWriter { - - private static final LogService LOGGER = - LogServiceFactory.getLogService(TableInfo.class.getName()); - - private Object[] pageLevelMin, pageLevelMax; - - private Map blockMinMaxMap; - - private int columnCnt; - private DataType[] dataTypeArray; - private String indexShardName; - - /** - * Since the sequence of indexed columns is defined the same as order in user-created, so - * map colIdx in user-created to colIdx in MinMaxIndex. - * Please note that the sequence of min-max values for each column in blocklet-min-max is not - * the same as indexed columns, so we need to reorder the origin while writing the min-max values - */ - private Map origin2MinMaxOrdinal = new HashMap<>(); - - public MinMaxDataWriter(CarbonTable carbonTable, DataMapSchema dataMapSchema, Segment segment, - String shardName, List indexColumns) { - super(carbonTable.getTablePath(), dataMapSchema.getDataMapName(), indexColumns, segment, - shardName); - this.columnCnt = indexColumns.size(); - for (CarbonColumn col : indexColumns) { - this.origin2MinMaxOrdinal.put(col.getSchemaOrdinal(), col.getOrdinal()); - } - if (this.dataTypeArray == null) { - this.dataTypeArray = new DataType[this.columnCnt]; - for (int i = 0; i < this.columnCnt; i++) { - this.dataTypeArray[i] = indexColumns.get(i).getDataType(); - } - } - } - - @Override public void onBlockStart(String blockId) { - if (blockMinMaxMap == null) { - blockMinMaxMap = new HashMap<>(); - } - } - - @Override public void onBlockEnd(String blockId) { - } - - @Override public void onBlockletStart(int blockletId) { - pageLevelMin = new Object[columnCnt]; - pageLevelMax = new Object[columnCnt]; - } - - @Override public void onBlockletEnd(int blockletId) { - updateCurrentBlockletMinMax(blockletId); - } - - @Override - public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages) { - // as an example, we don't use page-level min-max generated by native carbondata here, we get - // the min-max by comparing each row - for (int rowId = 0; rowId < pageSize; rowId++) { - for (int colIdx = 0; colIdx < columnCnt; colIdx++) { - Object originValue = pages[colIdx].getData(rowId); - DataType dataType = dataTypeArray[colIdx]; - // for string & bytes_array, data is prefixed with length, need to remove it - if (DataTypes.STRING == dataType || DataTypes.BYTE_ARRAY == dataType) { - byte[] valueMin0 = (byte[]) pageLevelMin[colIdx]; - byte[] valueMax0 = (byte[]) pageLevelMax[colIdx]; - byte[] value1 = (byte[]) originValue; - if (pageLevelMin[colIdx] == null || ByteUtil.UnsafeComparer.INSTANCE - .compareTo(valueMin0, 0, valueMin0.length, value1, 2, value1.length - 2) > 0) { - pageLevelMin[colIdx] = new byte[value1.length - 2]; - System.arraycopy(value1, 2, (byte[]) pageLevelMin[colIdx], 0, value1.length - 2); - } - if (pageLevelMax[colIdx] == null || ByteUtil.UnsafeComparer.INSTANCE - .compareTo(valueMax0, 0, valueMax0.length, value1, 2, value1.length - 2) < 0) { - pageLevelMax[colIdx] = new byte[value1.length - 2]; - System.arraycopy(value1, 2, (byte[]) pageLevelMax[colIdx], 0, value1.length - 2); - } - } else if (DataTypes.INT == dataType) { - updateMinMax(colIdx, originValue, dataType); - } else { - throw new UnsupportedOperationException("Not implement yet"); - } - } - } - } - - private void updateMinMax(int colIdx, Object originValue, DataType dataType) { - if (pageLevelMin[colIdx] == null) { - pageLevelMin[colIdx] = originValue; - } - if (pageLevelMax[colIdx] == null) { - pageLevelMax[colIdx] = originValue; - } - - if (DataTypes.SHORT == dataType) { - if (pageLevelMin[colIdx] == null || (short) pageLevelMin[colIdx] - (short) originValue > 0) { - pageLevelMin[colIdx] = originValue; - } - if (pageLevelMax[colIdx] == null || (short) pageLevelMax[colIdx] - (short) originValue < 0) { - pageLevelMax[colIdx] = originValue; - } - } else if (DataTypes.INT == dataType) { - if (pageLevelMin[colIdx] == null || (int) pageLevelMin[colIdx] - (int) originValue > 0) { - pageLevelMin[colIdx] = originValue; - } - if (pageLevelMax[colIdx] == null || (int) pageLevelMax[colIdx] - (int) originValue < 0) { - pageLevelMax[colIdx] = originValue; - } - } else if (DataTypes.LONG == dataType) { - if (pageLevelMin[colIdx] == null || (long) pageLevelMin[colIdx] - (long) originValue > 0) { - pageLevelMin[colIdx] = originValue; - } - if (pageLevelMax[colIdx] == null || (long) pageLevelMax[colIdx] - (long) originValue < 0) { - pageLevelMax[colIdx] = originValue; - } - } else if (DataTypes.DOUBLE == dataType) { - if (pageLevelMin[colIdx] == null - || (double) pageLevelMin[colIdx] - (double) originValue > 0) { - pageLevelMin[colIdx] = originValue; - } - if (pageLevelMax[colIdx] == null - || (double) pageLevelMax[colIdx] - (double) originValue < 0) { - pageLevelMax[colIdx] = originValue; - } - } else { - // todo: - throw new RuntimeException("Not implemented yet"); - } - } - - private void updateCurrentBlockletMinMax(int blockletId) { - byte[][] max = new byte[this.columnCnt][]; - byte[][] min = new byte[this.columnCnt][]; - for (int i = 0; i < this.columnCnt; i++) { - int targetColIdx = origin2MinMaxOrdinal.get(i); - max[targetColIdx] = CarbonUtil.getValueAsBytes(this.dataTypeArray[i], pageLevelMax[i]); - min[targetColIdx] = CarbonUtil.getValueAsBytes(this.dataTypeArray[i], pageLevelMin[i]); - } - - BlockletMinMax blockletMinMax = new BlockletMinMax(); - blockletMinMax.setMax(max); - blockletMinMax.setMin(min); - blockMinMaxMap.put(blockletId, blockletMinMax); - } - - - public void updateMinMaxIndex(String blockId) { - constructMinMaxIndex(blockId); - } - - /** - * Construct the Min Max Index. - * @param blockId - */ - public void constructMinMaxIndex(String blockId) { - // construct Min and Max values of each Blocklets present inside a block. - List tempMinMaxIndexBlockDetails = null; - tempMinMaxIndexBlockDetails = loadBlockDetails(); - try { - writeMinMaxIndexFile(tempMinMaxIndexBlockDetails, blockId); - } catch (IOException ex) { - LOGGER.info(" Unable to write the file"); - } - } - - /** - * loadBlockDetails into the MinMaxIndexBlockDetails class. - */ - private List loadBlockDetails() { - List minMaxIndexBlockDetails = new ArrayList(); - - for (int index = 0; index < blockMinMaxMap.size(); index++) { - MinMaxIndexBlockDetails tmpminMaxIndexBlockDetails = new MinMaxIndexBlockDetails(); - tmpminMaxIndexBlockDetails.setMinValues(blockMinMaxMap.get(index).getMin()); - tmpminMaxIndexBlockDetails.setMaxValues(blockMinMaxMap.get(index).getMax()); - tmpminMaxIndexBlockDetails.setBlockletId(index); - minMaxIndexBlockDetails.add(tmpminMaxIndexBlockDetails); - } - return minMaxIndexBlockDetails; - } - - /** - * Write the data to a file. This is JSON format file. - * @param minMaxIndexBlockDetails - * @param blockId - * @throws IOException - */ - public void writeMinMaxIndexFile(List minMaxIndexBlockDetails, - String blockId) throws IOException { - String filePath = dataMapPath + File.separator + blockId + ".minmaxindex"; - BufferedWriter brWriter = null; - DataOutputStream dataOutStream = null; - try { - FileFactory.createNewFile(filePath, FileFactory.getFileType(filePath)); - dataOutStream = FileFactory.getDataOutputStream(filePath, FileFactory.getFileType(filePath)); - Gson gsonObjectToWrite = new Gson(); - brWriter = new BufferedWriter(new OutputStreamWriter(dataOutStream, "UTF-8")); - String minmaxIndexData = gsonObjectToWrite.toJson(minMaxIndexBlockDetails); - brWriter.write(minmaxIndexData); - } catch (IOException ioe) { - LOGGER.info("Error in writing minMaxindex file"); - throw ioe; - } finally { - if (null != brWriter) { - brWriter.flush(); - } - if (null != dataOutStream) { - dataOutStream.flush(); - } - CarbonUtil.closeStreams(brWriter, dataOutStream); - commitFile(filePath); - } - } - - @Override public void finish() throws IOException { - updateMinMaxIndex(indexShardName); - } - - /** - * create and return path that will store the datamap - * - * @param dataPath patch to store the carbondata factdata - * @param dataMapName datamap name - * @return path to store the datamap - * @throws IOException - */ - public static String genDataMapStorePath(String dataPath, String dataMapName) - throws IOException { - String dmDir = dataPath + File.separator + dataMapName; - Path dmPath = FileFactory.getPath(dmDir); - FileSystem fs = FileFactory.getFileSystem(dmPath); - if (!fs.exists(dmPath)) { - fs.mkdirs(dmPath); - } - return dmDir; - } -} \ No newline at end of file diff --git a/datamap/examples/src/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java b/datamap/examples/src/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java deleted file mode 100644 index 93a453edac3..00000000000 --- a/datamap/examples/src/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.datamap.examples; - -import java.io.Serializable; - -public class MinMaxIndexBlockDetails implements Serializable { - private static final long serialVersionUID = 1206104914911491724L; - - /** - * Min value of a column of one blocklet Bit-Packed - */ - private byte[][] minValues; - - /** - * Max value of a columns of one blocklet Bit-Packed - */ - private byte[][] maxValues; - - /** - * BlockletID of the block. - */ - private Integer BlockletId; - - - public byte[][] getMinValues() { - return minValues; - } - - public void setMinValues(byte[][] minValues) { - this.minValues = minValues; - } - - public byte[][] getMaxValues() { - return maxValues; - } - - public void setMaxValues(byte[][] maxValues) { - this.maxValues = maxValues; - } - - public Integer getBlockletId() { - return BlockletId; - } - - public void setBlockletId(Integer blockletId) { - BlockletId = blockletId; - } -} \ No newline at end of file diff --git a/datamap/examples/src/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java b/datamap/examples/src/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java deleted file mode 100644 index 546024747c6..00000000000 --- a/datamap/examples/src/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.datamap.examples; - -import java.io.BufferedReader; -import java.io.DataInputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.BitSet; -import java.util.List; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datamap.dev.DataMapModel; -import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap; -import org.apache.carbondata.core.datastore.block.SegmentProperties; -import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.fileoperations.AtomicFileOperations; -import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory; -import org.apache.carbondata.core.indexstore.Blocklet; -import org.apache.carbondata.core.indexstore.PartitionSpec; -import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.scan.filter.FilterUtil; -import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; -import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; -import org.apache.carbondata.core.util.CarbonUtil; - -import com.google.gson.Gson; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; - -/** - * Datamap implementation for min max blocklet. - */ -public class MinMaxIndexDataMap extends CoarseGrainDataMap { - - private static final LogService LOGGER = - LogServiceFactory.getLogService(MinMaxIndexDataMap.class.getName()); - - private String[] indexFilePath; - - private MinMaxIndexBlockDetails[][] readMinMaxDataMap; - - @Override - public void init(DataMapModel model) throws MemoryException, IOException { - Path indexPath = FileFactory.getPath(model.getFilePath()); - - FileSystem fs = FileFactory.getFileSystem(indexPath); - if (!fs.exists(indexPath)) { - throw new IOException( - String.format("Path %s for MinMax index dataMap does not exist", indexPath)); - } - if (!fs.isDirectory(indexPath)) { - throw new IOException( - String.format("Path %s for MinMax index dataMap must be a directory", indexPath)); - } - - FileStatus[] indexFileStatus = fs.listStatus(indexPath, new PathFilter() { - @Override public boolean accept(Path path) { - return path.getName().endsWith(".minmaxindex"); - } - }); - - this.indexFilePath = new String[indexFileStatus.length]; - this.readMinMaxDataMap = new MinMaxIndexBlockDetails[indexFileStatus.length][]; - for (int i = 0; i < indexFileStatus.length; i++) { - this.indexFilePath[i] = indexFileStatus[i].getPath().toString(); - this.readMinMaxDataMap[i] = readJson(this.indexFilePath[i]); - } - } - - private MinMaxIndexBlockDetails[] readJson(String filePath) { - Gson gsonObjectToRead = new Gson(); - DataInputStream dataInputStream = null; - BufferedReader buffReader = null; - InputStreamReader inStream = null; - MinMaxIndexBlockDetails[] readMinMax = null; - AtomicFileOperations fileOperation = - AtomicFileOperationFactory.getAtomicFileOperations(filePath); - - try { - if (!FileFactory.isFileExist(filePath, FileFactory.getFileType(filePath))) { - return null; - } - dataInputStream = fileOperation.openForRead(); - inStream = new InputStreamReader(dataInputStream, "UTF-8"); - buffReader = new BufferedReader(inStream); - readMinMax = gsonObjectToRead.fromJson(buffReader, MinMaxIndexBlockDetails[].class); - } catch (IOException e) { - return null; - } finally { - CarbonUtil.closeStreams(buffReader, inStream, dataInputStream); - } - return readMinMax; - } - - /** - * Block Prunning logic for Min Max DataMap. - * - * @param filterExp - * @param segmentProperties - * @return - */ - @Override - public List prune(FilterResolverIntf filterExp, - SegmentProperties segmentProperties, List partitions) { - List blocklets = new ArrayList<>(); - - if (filterExp == null) { - for (int i = 0; i < readMinMaxDataMap.length; i++) { - for (int j = 0; j < readMinMaxDataMap[i].length; j++) { - blocklets.add(new Blocklet(indexFilePath[i], - String.valueOf(readMinMaxDataMap[i][j].getBlockletId()))); - } - } - } else { - FilterExecuter filterExecuter = - FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null); - for (int blkIdx = 0; blkIdx < readMinMaxDataMap.length; blkIdx++) { - for (int blkltIdx = 0; blkltIdx < readMinMaxDataMap[blkIdx].length; blkltIdx++) { - - BitSet bitSet = filterExecuter.isScanRequired( - readMinMaxDataMap[blkIdx][blkltIdx].getMaxValues(), - readMinMaxDataMap[blkIdx][blkltIdx].getMinValues()); - if (!bitSet.isEmpty()) { - String blockFileName = indexFilePath[blkIdx].substring( - indexFilePath[blkIdx].lastIndexOf(File.separatorChar) + 1, - indexFilePath[blkIdx].indexOf(".minmaxindex")); - Blocklet blocklet = new Blocklet(blockFileName, - String.valueOf(readMinMaxDataMap[blkIdx][blkltIdx].getBlockletId())); - LOGGER.info(String.format("MinMaxDataMap: Need to scan block#%s -> blocklet#%s, %s", - blkIdx, blkltIdx, blocklet)); - blocklets.add(blocklet); - } else { - LOGGER.info(String.format("MinMaxDataMap: Skip scan block#%s -> blocklet#%s", - blkIdx, blkltIdx)); - } - } - } - } - return blocklets; - } - - @Override - public boolean isScanRequired(FilterResolverIntf filterExp) { - throw new UnsupportedOperationException(); - } - - @Override - public void clear() { - readMinMaxDataMap = null; - } - - @Override - public void finish() { - - } - -} \ No newline at end of file diff --git a/datamap/examples/src/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java b/datamap/examples/src/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java deleted file mode 100644 index 1361d7ab574..00000000000 --- a/datamap/examples/src/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.datamap.examples; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datamap.DataMapDistributable; -import org.apache.carbondata.core.datamap.DataMapMeta; -import org.apache.carbondata.core.datamap.Segment; -import org.apache.carbondata.core.datamap.dev.DataMapBuilder; -import org.apache.carbondata.core.datamap.dev.DataMapModel; -import org.apache.carbondata.core.datamap.dev.DataMapWriter; -import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap; -import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory; -import org.apache.carbondata.core.datastore.block.SegmentProperties; -import org.apache.carbondata.core.features.TableOperation; -import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; -import org.apache.carbondata.core.scan.filter.intf.ExpressionType; -import org.apache.carbondata.core.util.path.CarbonTablePath; -import org.apache.carbondata.events.Event; - -import org.apache.commons.lang3.StringUtils; - -/** - * Min Max DataMap Factory - */ -public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory { - private static final LogService LOGGER = LogServiceFactory.getLogService( - MinMaxIndexDataMapFactory.class.getName()); - private DataMapMeta dataMapMeta; - private String dataMapName; - private AbsoluteTableIdentifier identifier; - - public MinMaxIndexDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema) { - super(carbonTable, dataMapSchema); - - // this is an example for datamap, we can choose the columns and operations that - // will be supported by this datamap. Furthermore, we can add cache-support for this datamap. - - // columns that will be indexed - List allColumns = getCarbonTable().getCreateOrderColumn(identifier.getTableName()); - - // operations that will be supported on the indexed columns - List optOperations = new ArrayList<>(); - optOperations.add(ExpressionType.EQUALS); - optOperations.add(ExpressionType.GREATERTHAN); - optOperations.add(ExpressionType.GREATERTHAN_EQUALTO); - optOperations.add(ExpressionType.LESSTHAN); - optOperations.add(ExpressionType.LESSTHAN_EQUALTO); - optOperations.add(ExpressionType.NOT_EQUALS); - LOGGER.error("MinMaxDataMap support operations: " + StringUtils.join(optOperations, ", ")); - this.dataMapMeta = new DataMapMeta(allColumns, optOperations); - } - - /** - * createWriter will return the MinMaxDataWriter. - * - * @param segment - * @param shardName - * @return - */ - @Override - public DataMapWriter createWriter(Segment segment, String shardName, - SegmentProperties segmentProperties) { - return new MinMaxDataWriter(getCarbonTable(), getDataMapSchema(), segment, shardName, - dataMapMeta.getIndexedColumns()); - } - - @Override - public DataMapBuilder createBuilder(Segment segment, String shardName, - SegmentProperties segmentProperties) throws IOException { - return null; - } - - /** - * getDataMaps Factory method Initializes the Min Max Data Map and returns. - * - * @param segment - * @return - * @throws IOException - */ - @Override - public List getDataMaps(Segment segment) - throws IOException { - List dataMapList = new ArrayList<>(); - // Form a dataMap of Type MinMaxIndexDataMap. - MinMaxIndexDataMap dataMap = new MinMaxIndexDataMap(); - try { - dataMap.init(new DataMapModel( - MinMaxDataWriter.genDataMapStorePath( - CarbonTablePath.getSegmentPath( - identifier.getTablePath(), segment.getSegmentNo()), - dataMapName))); - } catch (MemoryException ex) { - throw new IOException(ex); - } - dataMapList.add(dataMap); - return dataMapList; - } - - /** - * @param segment - * @return - */ - @Override public List toDistributable(Segment segment) { - return null; - } - - /** - * Clear the DataMap. - * - * @param segment - */ - @Override public void clear(Segment segment) { - } - - /** - * Clearing the data map. - */ - @Override public void clear() { - } - - @Override public List getDataMaps(DataMapDistributable distributable) - throws IOException { - return getDataMaps(distributable.getSegment()); - } - - @Override public void fireEvent(Event event) { - - } - - @Override public DataMapMeta getMeta() { - return this.dataMapMeta; - } - - @Override - public void deleteDatamapData(Segment segment) throws IOException { - - } - - @Override public void deleteDatamapData() { - - } - - @Override public boolean willBecomeStale(TableOperation operation) { - return false; - } -} \ No newline at end of file diff --git a/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/AbstractMinMaxDataMapWriter.java b/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/AbstractMinMaxDataMapWriter.java new file mode 100644 index 00000000000..3df55387610 --- /dev/null +++ b/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/AbstractMinMaxDataMapWriter.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.minmax; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.math.BigDecimal; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.DataMapWriter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert; +import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsCollector; +import org.apache.carbondata.core.datastore.page.statistics.KeyPageStatsCollector; +import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.util.CarbonUtil; + +/** + * We will record the min & max value for each index column in each blocklet. + * Since the size of index is quite small, we will combine the index for all index columns + * in one file. + */ +public abstract class AbstractMinMaxDataMapWriter extends DataMapWriter { + private static final LogService LOGGER = LogServiceFactory.getLogService( + AbstractMinMaxDataMapWriter.class.getName()); + + private ColumnPageStatsCollector[] indexColumnMinMaxCollectors; + protected int currentBlockletId; + private String currentIndexFile; + private DataOutputStream currentIndexFileOutStream; + + public AbstractMinMaxDataMapWriter(String tablePath, String dataMapName, + List indexColumns, Segment segment, String shardName) throws IOException { + super(tablePath, dataMapName, indexColumns, segment, shardName); + initStatsCollector(); + initDataMapFile(); + } + + private void initStatsCollector() { + indexColumnMinMaxCollectors = new ColumnPageStatsCollector[indexColumns.size()]; + for (int i = 0; i < indexColumns.size(); i++) { + if (indexColumns.get(i).isDimension()) { + indexColumnMinMaxCollectors[i] = KeyPageStatsCollector.newInstance(DataTypes.BYTE_ARRAY); + } else if (indexColumns.get(i).isMeasure()) { + indexColumnMinMaxCollectors[i] = PrimitivePageStatsCollector.newInstance( + indexColumns.get(i).getDataType()); + } else { + throw new UnsupportedOperationException( + "MinMax datamap only supports dimension and measure"); + } + } + } + + private void initDataMapFile() throws IOException { + if (!FileFactory.isFileExist(dataMapPath) && + !FileFactory.mkdirs(dataMapPath, FileFactory.getFileType(dataMapPath))) { + throw new IOException("Failed to create directory " + dataMapPath); + } + + try { + currentIndexFile = MinMaxIndexDataMap.getIndexFile(dataMapPath, + MinMaxIndexHolder.MINMAX_INDEX_PREFFIX + indexColumns.size()); + FileFactory.createNewFile(currentIndexFile, FileFactory.getFileType(currentIndexFile)); + currentIndexFileOutStream = FileFactory.getDataOutputStream(currentIndexFile, + FileFactory.getFileType(currentIndexFile)); + } catch (IOException e) { + CarbonUtil.closeStreams(currentIndexFileOutStream); + LOGGER.error(e, "Failed to init datamap index file"); + throw e; + } + } + + protected void resetBlockletLevelMinMax() { + for (int i = 0; i < indexColumns.size(); i++) { + indexColumnMinMaxCollectors[i].getPageStats().clear(); + } + } + + @Override + public void onBlockStart(String blockId) { + } + + @Override + public void onBlockEnd(String blockId) { + } + + @Override public void onBlockletStart(int blockletId) { + } + + @Override public void onBlockletEnd(int blockletId) { + flushMinMaxIndexFile(); + currentBlockletId++; + } + + @Override + public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages) { + // as an example, we don't use page-level min-max generated by native carbondata here, we get + // the min-max by comparing each row + for (int rowId = 0; rowId < pageSize; rowId++) { + for (int colIdx = 0; colIdx < indexColumns.size(); colIdx++) { + Object originValue = pages[colIdx].getData(rowId); + updateBlockletMinMax(colIdx, originValue); + } + } + } + + protected void updateBlockletMinMax(int indexColIdx, Object value) { + if (null == value) { + indexColumnMinMaxCollectors[indexColIdx].updateNull(0); + return; + } + + DataType dataType = indexColumns.get(indexColIdx).getDataType(); + if (indexColumns.get(indexColIdx).isMeasure()) { + if (DataTypes.BOOLEAN == dataType) { + indexColumnMinMaxCollectors[indexColIdx].update( + BooleanConvert.boolean2Byte((boolean) value)); + } else if (DataTypes.SHORT == dataType) { + indexColumnMinMaxCollectors[indexColIdx].update((short) value); + } else if (DataTypes.INT == dataType) { + indexColumnMinMaxCollectors[indexColIdx].update((int) value); + } else if (DataTypes.LONG == dataType) { + indexColumnMinMaxCollectors[indexColIdx].update((long) value); + } else if (DataTypes.DOUBLE == dataType) { + indexColumnMinMaxCollectors[indexColIdx].update((double) value); + } else if (DataTypes.isDecimal(dataType)) { + indexColumnMinMaxCollectors[indexColIdx].update((BigDecimal) value); + } else { + throw new UnsupportedOperationException("unsupported data type " + dataType); + } + } else { + // While pruning for query, we want to reuse the pruning method from carbon, so here for + // dictionary columns, we need to store the mdk value in the minmax index. + // For direct generating, the input value is already MDK; For late building, the input value + // is surrogate key, so we need to handle it here. + if (indexColumns.get(indexColIdx).hasEncoding(Encoding.DICTIONARY)) { + indexColumnMinMaxCollectors[indexColIdx].update(convertDictValueToMdk(indexColIdx, value)); + } else { + byte[] plainValue = convertNonDicValueToPlain(indexColIdx, (byte[]) value); + indexColumnMinMaxCollectors[indexColIdx].update(plainValue); + } + } + } + + protected abstract byte[] convertDictValueToMdk(int indexColIdx, Object value); + + protected abstract byte[] convertNonDicValueToPlain(int indexColIdx, byte[] value); + + private void logMinMaxInfo(int indexColId) { + StringBuilder sb = new StringBuilder("flush blockletId->").append(currentBlockletId) + .append(", column->").append(indexColumns.get(indexColId).getColName()) + .append(", dataType->").append(indexColumns.get(indexColId).getDataType().getName()); + Object min = indexColumnMinMaxCollectors[indexColId].getPageStats().getMin(); + Object max = indexColumnMinMaxCollectors[indexColId].getPageStats().getMax(); + if (indexColumns.get(indexColId).isDimension()) { + sb.append(", min->") + .append(new String((byte[]) min, CarbonCommonConstants.DEFAULT_CHARSET_CLASS)) + .append(", max->") + .append(new String((byte[]) max, CarbonCommonConstants.DEFAULT_CHARSET_CLASS)); + } else { + sb.append(", min->").append(min) + .append(", max->").append(max); + } + LOGGER.debug(sb.toString()); + } + + /** + * Write the data to a file. + */ + protected void flushMinMaxIndexFile() { + try { + MinMaxIndexHolder minMaxIndexHolder = new MinMaxIndexHolder(indexColumns.size()); + minMaxIndexHolder.setBlockletId(currentBlockletId); + for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) { + if (LOGGER.isDebugEnabled()) { + logMinMaxInfo(indexColId); + } + minMaxIndexHolder.setMinValueAtPos(indexColId, CarbonUtil.getValueAsBytes( + indexColumnMinMaxCollectors[indexColId].getPageStats().getDataType(), + indexColumnMinMaxCollectors[indexColId].getPageStats().getMin())); + minMaxIndexHolder.setMaxValueAtPos(indexColId, CarbonUtil.getValueAsBytes( + indexColumnMinMaxCollectors[indexColId].getPageStats().getDataType(), + indexColumnMinMaxCollectors[indexColId].getPageStats().getMax())); + } + minMaxIndexHolder.write(currentIndexFileOutStream); + currentIndexFileOutStream.flush(); + } catch (IOException e) { + LOGGER.error(e, "Failed to flush minmax index to file " + currentIndexFile); + releaseResource(); + throw new RuntimeException(e); + } finally { + resetBlockletLevelMinMax(); + } + } + + @Override + public void finish() throws IOException { + if (!isWritingFinished()) { + releaseResource(); + setWritingFinished(true); + } + } + + protected void releaseResource() { + CarbonUtil.closeStreams(currentIndexFileOutStream); + } +} diff --git a/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapBuilder.java b/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapBuilder.java new file mode 100644 index 00000000000..a388b98a87d --- /dev/null +++ b/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapBuilder.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.minmax; + +import java.io.IOException; +import java.util.List; + +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.DataMapBuilder; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.keygenerator.KeyGenerator; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.scan.filter.FilterUtil; + +public class MinMaxDataMapBuilder extends AbstractMinMaxDataMapWriter implements DataMapBuilder { + private KeyGenerator keyGenerator; + + MinMaxDataMapBuilder(String tablePath, String dataMapName, List indexColumns, + Segment segment, String shardName, SegmentProperties segmentProperties) throws IOException { + super(tablePath, dataMapName, indexColumns, segment, shardName); + for (CarbonColumn col : indexColumns) { + if (col.hasEncoding(Encoding.DICTIONARY)) { + initKeyGenerator(segmentProperties); + break; + } + } + } + + private void initKeyGenerator(SegmentProperties segmentProperties) { + keyGenerator = segmentProperties.getDimensionKeyGenerator(); + } + + @Override + public void initialize() throws IOException { + super.resetBlockletLevelMinMax(); + } + + @Override + public void addRow(int blockletId, int pageId, int rowId, Object[] values) { + if (currentBlockletId != blockletId) { + // new blocklet started, flush bloom filter to datamap fileh + super.flushMinMaxIndexFile(); + currentBlockletId = blockletId; + } + // for each indexed column, add the data to bloom filter + for (int i = 0; i < indexColumns.size(); i++) { + Object data = values[i]; + updateBlockletMinMax(i, data); + } + } + + @Override + protected byte[] convertNonDicValueToPlain(int indexColIdx, byte[] value) { + return value; + } + + @Override + public void finish() throws IOException { + if (!isWritingFinished()) { + flushMinMaxIndexFile(); + releaseResource(); + setWritingFinished(true); + } + } + + @Override + protected byte[] convertDictValueToMdk(int indexColIdx, Object value) { + // input value from IndexDataMapRebuildRDD is already decoded as surrogate key + // we need to convert the surrogate key to MDK now + CarbonColumn carbonColumn = indexColumns.get(indexColIdx); + assert (carbonColumn instanceof CarbonDimension); + return FilterUtil.getMaskKey((int) value, (CarbonDimension) carbonColumn, keyGenerator); + } + + @Override + public void close() throws IOException { + releaseResource(); + } + + @Override + public boolean isIndexForCarbonRawBytes() { + return true; + } +} diff --git a/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapCache.java b/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapCache.java new file mode 100644 index 00000000000..b3f7e0b1e25 --- /dev/null +++ b/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapCache.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.minmax; + +import java.io.DataInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CarbonLRUCache; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + +/** + * cache minmax datamap models using carbon lru cache. + */ +public class MinMaxDataMapCache + implements Cache { + private static final LogService LOGGER = + LogServiceFactory.getLogService(MinMaxDataMapCache.class.getName()); + private CarbonLRUCache lruCache; + + public MinMaxDataMapCache(CarbonLRUCache lruCache) { + this.lruCache = lruCache; + } + + @Override + public MinMaxDataMapCacheKeyValue.Value get(MinMaxDataMapCacheKeyValue.Key key) + throws IOException { + MinMaxDataMapCacheKeyValue.Value cacheValue = getIfPresent(key); + if (null == cacheValue) { + cacheValue = loadMinMaxModels(FileFactory.getPath(key.getShardPath())); + lruCache.put(key.uniqueString(), cacheValue, cacheValue.getMemorySize()); + } + return cacheValue; + } + + private MinMaxDataMapCacheKeyValue.Value loadMinMaxModels(Path shardPath) throws IOException { + FileSystem fs = FileFactory.getFileSystem(shardPath); + if (!fs.exists(shardPath)) { + throw new IOException( + String.format("Path %s for MinMax index dataMap does not exist", shardPath)); + } + if (!fs.isDirectory(shardPath)) { + throw new IOException( + String.format("Path %s for MinMax index dataMap must be a directory", shardPath)); + } + FileStatus[] indexFileStatus = fs.listStatus(shardPath, new PathFilter() { + @Override public boolean accept(Path path) { + return path.getName().endsWith(".minmaxindex"); + } + }); + + List minMaxIndexHolderList = new ArrayList<>(); + for (int i = 0; i < indexFileStatus.length; i++) { + List dataMapModels = + loadMinMaxIndexFromFile(indexFileStatus[i].getPath().toString()); + minMaxIndexHolderList.addAll(dataMapModels); + } + return new MinMaxDataMapCacheKeyValue.Value(minMaxIndexHolderList); + } + + private List loadMinMaxIndexFromFile(String indexFile) throws IOException { + LOGGER.info("load minmax datamap model from file " + indexFile); + List dataMapModels = new ArrayList<>(); + DataInputStream inputStream = null; + try { + inputStream = FileFactory.getDataInputStream(indexFile, FileFactory.getFileType(indexFile)); + String fileName = new Path(indexFile).getName(); + int indexColCnt = Integer.parseInt(fileName.substring( + MinMaxIndexHolder.MINMAX_INDEX_PREFFIX.length(), + fileName.indexOf(MinMaxIndexHolder.MINMAX_INDEX_SUFFIX))); + while (inputStream.available() > 0) { + MinMaxIndexHolder minMaxIndexHolder = new MinMaxIndexHolder(indexColCnt); + minMaxIndexHolder.readFields(inputStream); + dataMapModels.add(minMaxIndexHolder); + } + LOGGER.info(String.format("Read %d minmax indices from %s", dataMapModels.size(), indexFile)); + return dataMapModels; + } catch (Exception e) { + LOGGER.error(e, "Failed to load minmax index from file"); + throw new IOException(e); + } finally { + CarbonUtil.closeStreams(inputStream); + } + } + + @Override + public List getAll(List keys) + throws IOException { + List cacheValues = new ArrayList<>(keys.size()); + for (MinMaxDataMapCacheKeyValue.Key key : keys) { + cacheValues.add(get(key)); + } + return cacheValues; + } + + @Override + public MinMaxDataMapCacheKeyValue.Value getIfPresent(MinMaxDataMapCacheKeyValue.Key key) { + return (MinMaxDataMapCacheKeyValue.Value) lruCache.get(key.uniqueString()); + } + + @Override + public void invalidate(MinMaxDataMapCacheKeyValue.Key key) { + lruCache.remove(key.uniqueString()); + } + + @Override + public void put(MinMaxDataMapCacheKeyValue.Key key, MinMaxDataMapCacheKeyValue.Value value) + throws IOException, MemoryException { + lruCache.put(key.uniqueString(), value, value.getMemorySize()); + } + + @Override + public void clearAccessCount(List keys) { + LOGGER.error("clearAccessCount is not implemented for MinMaxDataMapCache"); + } +} diff --git a/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapCacheKeyValue.java b/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapCacheKeyValue.java new file mode 100644 index 00000000000..5b29c11441c --- /dev/null +++ b/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapCacheKeyValue.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.minmax; + +import java.io.Serializable; +import java.util.List; + +import org.apache.carbondata.core.cache.Cacheable; + +public class MinMaxDataMapCacheKeyValue { + + public static class Key implements Serializable { + private static final long serialVersionUID = -147823808435277L; + private String shardPath; + + public Key(String shardPath) { + this.shardPath = shardPath; + } + + public String getShardPath() { + return shardPath; + } + + @Override + public String toString() { + final StringBuffer sb = new StringBuffer("Key{"); + sb.append("shardPath='").append(shardPath).append('\''); + sb.append('}'); + return sb.toString(); + } + + public String uniqueString() { + return "minmaxcache_" + shardPath; + } + } + + public static class Value implements Cacheable { + private List minMaxIndexHolders; + private int size; + + public Value(List minMaxIndexHolders) { + this.minMaxIndexHolders = minMaxIndexHolders; + for (MinMaxIndexHolder model : minMaxIndexHolders) { + this.size += model.getSize(); + } + } + + public List getMinMaxIndexHolders() { + return minMaxIndexHolders; + } + + @Override + public long getFileTimeStamp() { + return 0; + } + + @Override + public int getAccessCount() { + return 0; + } + + @Override + public long getMemorySize() { + return size; + } + } +} diff --git a/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapDirectWriter.java b/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapDirectWriter.java new file mode 100644 index 00000000000..f3ff99acd42 --- /dev/null +++ b/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapDirectWriter.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.minmax; + +import java.io.IOException; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; + +@InterfaceAudience.Internal +public class MinMaxDataMapDirectWriter extends AbstractMinMaxDataMapWriter { + + MinMaxDataMapDirectWriter(String tablePath, String dataMapName, List indexColumns, + Segment segment, String shardName, SegmentProperties segmentProperties) throws IOException { + super(tablePath, dataMapName, indexColumns, segment, shardName); + } + + protected byte[] convertNonDicValueToPlain(int indexColIdx, byte[] lvData) { + int lenInLV = (DataTypes.VARCHAR == indexColumns.get(indexColIdx).getDataType()) ? + CarbonCommonConstants.INT_SIZE_IN_BYTE : CarbonCommonConstants.SHORT_SIZE_IN_BYTE; + byte[] indexValue = new byte[lvData.length - lenInLV]; + System.arraycopy(lvData, lenInLV, indexValue, 0, lvData.length - lenInLV); + return indexValue; + } + + @Override + protected byte[] convertDictValueToMdk(int indexColIdx, Object value) { + // input value from onPageAdded in load process is byte[] + return (byte[]) value; + } +} diff --git a/datamap/examples/src/main/java/org/apache/carbondata/datamap/examples/BlockletMinMax.java b/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapDistributable.java similarity index 62% rename from datamap/examples/src/main/java/org/apache/carbondata/datamap/examples/BlockletMinMax.java rename to datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapDistributable.java index e6968fe32ef..be94fda8c82 100644 --- a/datamap/examples/src/main/java/org/apache/carbondata/datamap/examples/BlockletMinMax.java +++ b/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapDistributable.java @@ -15,27 +15,23 @@ * limitations under the License. */ -package org.apache.carbondata.datamap.examples; +package org.apache.carbondata.datamap.minmax; +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.datamap.DataMapDistributable; -public class BlockletMinMax { - private byte[][] Min; +@InterfaceAudience.Internal +public class MinMaxDataMapDistributable extends DataMapDistributable { + /** + * parent folder of the minmaxindex file + */ + private String indexPath; - private byte[][] Max; - - public byte[][] getMin() { - return Min; - } - - public void setMin(byte[][] min) { - Min = min; - } - - public byte[][] getMax() { - return Max; + MinMaxDataMapDistributable(String indexPath) { + this.indexPath = indexPath; } - public void setMax(byte[][] max) { - Max = max; + public String getIndexPath() { + return indexPath; } } diff --git a/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapFactory.java b/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapFactory.java new file mode 100644 index 00000000000..9e169435351 --- /dev/null +++ b/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapFactory.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.minmax; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.datamap.DataMapLevel; +import org.apache.carbondata.core.datamap.DataMapMeta; +import org.apache.carbondata.core.datamap.DataMapStoreManager; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.TableDataMap; +import org.apache.carbondata.core.datamap.dev.DataMapBuilder; +import org.apache.carbondata.core.datamap.dev.DataMapWriter; +import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap; +import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.features.TableOperation; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.scan.filter.intf.ExpressionType; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.events.Event; + +/** + * Min Max DataMap Factory + */ +@InterfaceAudience.Internal +public class MinMaxDataMapFactory extends CoarseGrainDataMapFactory { + private static final LogService LOGGER = + LogServiceFactory.getLogService(MinMaxDataMapFactory.class.getName()); + private DataMapMeta dataMapMeta; + private String dataMapName; + // segmentId -> list of index files + private Map> segmentMap = new ConcurrentHashMap<>(); + private Cache cache; + + public MinMaxDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema) + throws MalformedDataMapCommandException { + super(carbonTable, dataMapSchema); + if (!carbonTable.isExternalFormatTable()) { + LOGGER.warn("It is not recommended to use MinMax datamap for ordinary carbon table," + + " because default Block/Blocklet datamap already covers its function." + + " As a result, it will not prune better than default Block/Blocklet datamap," + + " the query performance will be worse."); + } + + // this is an example for datamap, we can choose the columns and operations that + // will be supported by this datamap. Furthermore, we can add cache-support for this datamap. + + this.dataMapName = dataMapSchema.getDataMapName(); + List indexedColumns = carbonTable.getIndexedColumns(dataMapSchema); + + // operations that will be supported on the indexed columns + List optOperations = new ArrayList<>(); + optOperations.add(ExpressionType.NOT); + optOperations.add(ExpressionType.EQUALS); + optOperations.add(ExpressionType.NOT_EQUALS); + optOperations.add(ExpressionType.GREATERTHAN); + optOperations.add(ExpressionType.GREATERTHAN_EQUALTO); + optOperations.add(ExpressionType.LESSTHAN); + optOperations.add(ExpressionType.LESSTHAN_EQUALTO); + optOperations.add(ExpressionType.IN); + this.dataMapMeta = new DataMapMeta(indexedColumns, optOperations); + + // init cache. note that the createCache ensures the singleton of the cache + try { + this.cache = CacheProvider.getInstance() + .createCache(new CacheType("minmax_cache"), MinMaxDataMapCache.class.getName()); + } catch (Exception e) { + LOGGER.error(e, "Failed to create cache for minmax datamap"); + throw new MalformedDataMapCommandException(e.getMessage()); + } + } + + /** + * createWriter will return the MinMaxDataWriter. + * + * @param segment + * @param shardName + * @return + */ + @Override + public DataMapWriter createWriter(Segment segment, String shardName, + SegmentProperties segmentProperties) throws IOException { + LOGGER.error(String + .format("Data of MinMaxDataMap %s for table %s will be written to %s", dataMapName, + getCarbonTable().getTableName(), shardName)); + return new MinMaxDataMapDirectWriter(getCarbonTable().getTablePath(), dataMapName, + dataMapMeta.getIndexedColumns(), segment, shardName, segmentProperties); + } + + @Override + public DataMapBuilder createBuilder(Segment segment, String shardName, + SegmentProperties segmentProperties) throws IOException { + return new MinMaxDataMapBuilder(getCarbonTable().getTablePath(), dataMapName, + dataMapMeta.getIndexedColumns(), segment, shardName, segmentProperties); + } + + /** + * getDataMaps Factory method Initializes the Min Max Data Map and returns. + * + * @param segment + * @return + * @throws IOException + */ + @Override + public List getDataMaps(Segment segment) throws IOException { + List dataMaps = new ArrayList<>(); + Set shardPaths = segmentMap.get(segment.getSegmentNo()); + if (shardPaths == null) { + String dataMapStorePath = DataMapWriter.getDefaultDataMapPath( + getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName); + CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles(); + shardPaths = new HashSet<>(); + for (CarbonFile carbonFile : carbonFiles) { + shardPaths.add(carbonFile.getAbsolutePath()); + } + segmentMap.put(segment.getSegmentNo(), shardPaths); + } + + for (String shard : shardPaths) { + MinMaxIndexDataMap dataMap = new MinMaxIndexDataMap(); + dataMap.init(new MinMaxDataMapModel(shard, cache)); + dataMap.initOthers(getCarbonTable(), dataMapMeta.getIndexedColumns()); + dataMaps.add(dataMap); + } + return dataMaps; + } + + @Override + public DataMapMeta getMeta() { + return this.dataMapMeta; + } + + @Override + public DataMapLevel getDataMapLevel() { + return DataMapLevel.CG; + } + + @Override + public List getDataMaps(DataMapDistributable distributable) + throws IOException { + List coarseGrainDataMaps = new ArrayList<>(); + MinMaxIndexDataMap minMaxIndexDataMap = new MinMaxIndexDataMap(); + String indexPath = ((MinMaxDataMapDistributable) distributable).getIndexPath(); + minMaxIndexDataMap.init(new MinMaxDataMapModel(indexPath, cache)); + minMaxIndexDataMap.initOthers(getCarbonTable(), dataMapMeta.getIndexedColumns()); + coarseGrainDataMaps.add(minMaxIndexDataMap); + return coarseGrainDataMaps; + } + + /** + * returns all the directories of lucene index files for query + * Note: copied from BloomFilterDataMapFactory, will extract to a common interface + */ + private CarbonFile[] getAllIndexDirs(String tablePath, String segmentId) { + List indexDirs = new ArrayList<>(); + List dataMaps; + try { + // there can be multiple bloom datamaps present on a table, so get all datamaps and form + // the path till the index file directories in all datamaps folders present in each segment + dataMaps = DataMapStoreManager.getInstance().getAllDataMap(getCarbonTable()); + } catch (IOException ex) { + LOGGER.error(ex, String + .format("failed to get datamaps for tablePath %s, segmentId %s", tablePath, segmentId)); + throw new RuntimeException(ex); + } + if (dataMaps.size() > 0) { + for (TableDataMap dataMap : dataMaps) { + if (dataMap.getDataMapSchema().getDataMapName().equals(this.dataMapName)) { + List indexFiles; + String dmPath = CarbonTablePath.getDataMapStorePath(tablePath, segmentId, + dataMap.getDataMapSchema().getDataMapName()); + FileFactory.FileType fileType = FileFactory.getFileType(dmPath); + final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath, fileType); + indexFiles = Arrays.asList(dirPath.listFiles(new CarbonFileFilter() { + @Override + public boolean accept(CarbonFile file) { + return file.isDirectory(); + } + })); + indexDirs.addAll(indexFiles); + } + } + } + return indexDirs.toArray(new CarbonFile[0]); + } + + @Override + public List toDistributable(Segment segment) { + List dataMapDistributableList = new ArrayList<>(); + CarbonFile[] indexDirs = + getAllIndexDirs(getCarbonTable().getTablePath(), segment.getSegmentNo()); + if (segment.getFilteredIndexShardNames().size() == 0) { + for (CarbonFile indexDir : indexDirs) { + DataMapDistributable bloomDataMapDistributable = + new MinMaxDataMapDistributable(indexDir.getAbsolutePath()); + dataMapDistributableList.add(bloomDataMapDistributable); + } + return dataMapDistributableList; + } + for (CarbonFile indexDir : indexDirs) { + // Filter out the tasks which are filtered through CG datamap. + if (!segment.getFilteredIndexShardNames().contains(indexDir.getName())) { + continue; + } + DataMapDistributable bloomDataMapDistributable = + new MinMaxDataMapDistributable(indexDir.getAbsolutePath()); + dataMapDistributableList.add(bloomDataMapDistributable); + } + return dataMapDistributableList; + } + + @Override + public void fireEvent(Event event) { + + } + + @Override + public void clear(Segment segment) { + Set shards = segmentMap.remove(segment.getSegmentNo()); + if (null != shards) { + for (String shard : shards) { + cache.invalidate(new MinMaxDataMapCacheKeyValue.Key(shard)); + } + } + } + + @Override + public synchronized void clear() { + if (segmentMap.size() > 0) { + List segments = new ArrayList<>(segmentMap.keySet()); + for (String segmentId : segments) { + clear(new Segment(segmentId, null, null)); + } + } + } + + @Override + public void deleteDatamapData(Segment segment) throws IOException { + try { + String segmentId = segment.getSegmentNo(); + String datamapPath = CarbonTablePath + .getDataMapStorePath(getCarbonTable().getTablePath(), segmentId, dataMapName); + if (FileFactory.isFileExist(datamapPath)) { + CarbonFile file = + FileFactory.getCarbonFile(datamapPath, FileFactory.getFileType(datamapPath)); + CarbonUtil.deleteFoldersAndFilesSilent(file); + } + } catch (InterruptedException ex) { + throw new IOException("Failed to delete datamap for segment_" + segment.getSegmentNo()); + } + } + + @Override + public void deleteDatamapData() { + SegmentStatusManager ssm = + new SegmentStatusManager(getCarbonTable().getAbsoluteTableIdentifier()); + try { + List validSegments = ssm.getValidAndInvalidSegments().getValidSegments(); + for (Segment segment : validSegments) { + deleteDatamapData(segment); + } + } catch (IOException e) { + LOGGER.error("drop datamap failed, failed to delete datamap directory"); + } + } + + @Override + public boolean willBecomeStale(TableOperation operation) { + switch (operation) { + case ALTER_RENAME: + return false; + case ALTER_DROP: + return true; + case ALTER_ADD_COLUMN: + return false; + case ALTER_CHANGE_DATATYPE: + return true; + case STREAMING: + return false; + case DELETE: + return true; + case UPDATE: + return true; + case PARTITION: + return true; + default: + return false; + } + } + + @Override + public boolean isOperationBlocked(TableOperation operation, Object... targets) { + switch (operation) { + case ALTER_DROP: { + // alter table drop columns + // will be blocked if the columns in bloomfilter datamap + List columnsToDrop = (List) targets[0]; + List indexedColumnNames = dataMapMeta.getIndexedColumnNames(); + for (String indexedcolumn : indexedColumnNames) { + for (String column : columnsToDrop) { + if (column.equalsIgnoreCase(indexedcolumn)) { + return true; + } + } + } + return false; + } + case ALTER_CHANGE_DATATYPE: { + // alter table change one column datatype + // will be blocked if the column in bloomfilter datamap + String columnToChangeDatatype = (String) targets[0]; + List indexedColumnNames = dataMapMeta.getIndexedColumnNames(); + for (String indexedcolumn : indexedColumnNames) { + if (indexedcolumn.equalsIgnoreCase(columnToChangeDatatype)) { + return true; + } + } + return false; + } + default: + return false; + } + } + + @Override + public boolean supportRebuild() { + return false; + } +} diff --git a/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapModel.java b/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapModel.java new file mode 100644 index 00000000000..06df0a628a5 --- /dev/null +++ b/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapModel.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.minmax; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.datamap.dev.DataMapModel; + +public class MinMaxDataMapModel extends DataMapModel { + private Cache cache; + + public MinMaxDataMapModel(String filePath, + Cache cache) { + super(filePath); + this.cache = cache; + } + + public Cache getCache() { + return cache; + } +} diff --git a/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxIndexDataMap.java b/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxIndexDataMap.java new file mode 100644 index 00000000000..b0de19aeb24 --- /dev/null +++ b/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxIndexDataMap.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.minmax; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; +import java.util.Objects; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.datamap.dev.DataMapModel; +import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.indexstore.Blocklet; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; + +import org.apache.hadoop.fs.Path; + +/** + * Datamap implementation for min max blocklet. + */ +@InterfaceAudience.Internal +public class MinMaxIndexDataMap extends CoarseGrainDataMap { + private static final LogService LOGGER = + LogServiceFactory.getLogService(MinMaxIndexDataMap.class.getName()); + private Cache cache; + private MinMaxDataMapCacheKeyValue.Key cacheKey; + private List indexColumns; + private String shardName; + + @Override + public void init(DataMapModel model) throws IOException { + Path shardPath = FileFactory.getPath(model.getFilePath()); + this.shardName = shardPath.getName(); + assert model instanceof MinMaxDataMapModel; + this.cache = ((MinMaxDataMapModel) model).getCache(); + this.cacheKey = new MinMaxDataMapCacheKeyValue.Key(model.getFilePath()); + } + + /** + * init field converters for index columns + */ + public void initOthers(CarbonTable carbonTable, List indexedColumn) + throws IOException { + this.indexColumns = indexedColumn; + } + + /** + * Block Prunning logic for Min Max DataMap. It will reuse the pruning procedure. + */ + @Override + public List prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties, + List partitions) throws IOException { + Objects.requireNonNull(filterExp); + List hitBlocklets = new ArrayList<>(); + FilterExecuter filterExecuter = + FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null, indexColumns); + List minMaxIndexHolders = cache.get(cacheKey).getMinMaxIndexHolders(); + for (int i = 0; i < minMaxIndexHolders.size(); i++) { + byte[][] minValues = minMaxIndexHolders.get(i).getMinValues(); + byte[][] maxValues = minMaxIndexHolders.get(i).getMaxValues(); + + BitSet bitSet = filterExecuter.isScanRequired(maxValues, minValues); + if (!bitSet.isEmpty()) { + LOGGER.debug(String.format("MinMaxDataMap: Need to scan -> blocklet#%s", + minMaxIndexHolders.get(i).getBlockletId())); + Blocklet blocklet = + new Blocklet(shardName, String.valueOf(minMaxIndexHolders.get(i).getBlockletId())); + hitBlocklets.add(blocklet); + } else { + LOGGER.debug(String.format("MinMaxDataMap: Skip scan -> blocklet#%s", + minMaxIndexHolders.get(i).getBlockletId())); + } + } + return hitBlocklets; + } + + @Override + public boolean isScanRequired(FilterResolverIntf filterExp) { + return true; + } + + @Override + public void clear() { + } + + public static String getIndexFile(String shardPath, String combineColumn) { + return shardPath.concat(File.separator).concat(combineColumn) + .concat(MinMaxIndexHolder.MINMAX_INDEX_SUFFIX); + } + + @Override + public void finish() { + + } +} diff --git a/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxIndexHolder.java b/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxIndexHolder.java new file mode 100644 index 00000000000..c1e1b2a0781 --- /dev/null +++ b/datamap/examples/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxIndexHolder.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.minmax; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; + +import org.apache.carbondata.core.metadata.schema.table.Writable; + +public class MinMaxIndexHolder implements Writable, Serializable { + private static final long serialVersionUID = 72314567865325672L; + public static final String MINMAX_INDEX_SUFFIX = ".minmaxindex"; + public static final String MINMAX_INDEX_PREFFIX = "combine_"; + + /** + * BlockletID of the block. + */ + private int blockletId; + + /** + * Min values of each index column of one blocklet Bit-Packed + */ + private byte[][] minValues; + + /** + * Max values of each index column of one blocklet Bit-Packed + */ + private byte[][] maxValues; + + public MinMaxIndexHolder(int colNum) { + minValues = new byte[colNum][]; + maxValues = new byte[colNum][]; + } + + /** + * calculate the memory size of this object + */ + public long getSize() { + // for blockletId + long size = 4; + for (int i = 0; i < minValues.length; i++) { + if (minValues[i] != null) { + size += minValues[i].length; + } + if (maxValues[i] != null) { + size += maxValues[i].length; + } + } + return size; + } + + public byte[][] getMinValues() { + return minValues; + } + + public void setMinValues(byte[][] minValues) { + this.minValues = minValues; + } + + public byte[][] getMaxValues() { + return maxValues; + } + + public void setMaxValues(byte[][] maxValues) { + this.maxValues = maxValues; + } + + public int getBlockletId() { + return blockletId; + } + + public void setBlockletId(int blockletId) { + this.blockletId = blockletId; + } + + public byte[] getMinValueAtPos(int pos) { + return minValues[pos]; + } + + public void setMinValueAtPos(int pos, byte[] minValue) { + this.minValues[pos] = minValue; + } + + public byte[] getMaxValueAtPos(int pos) { + return maxValues[pos]; + } + + public void setMaxValueAtPos(int pos, byte[] maxValue) { + this.maxValues[pos] = maxValue; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(blockletId); + out.writeInt(minValues.length); + for (int i = 0; i < minValues.length; i++) { + out.writeInt(minValues[i].length); + out.write(minValues[i]); + out.writeInt(maxValues[i].length); + out.write(maxValues[i]); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + blockletId = in.readInt(); + int valueCnt = in.readInt(); + minValues = new byte[valueCnt][]; + maxValues = new byte[valueCnt][]; + for (int i = 0; i < valueCnt; i++) { + int minValueLength = in.readInt(); + minValues[i] = new byte[minValueLength]; + in.readFully(minValues[i]); + int maxValueLength = in.readInt(); + maxValues[i] = new byte[maxValueLength]; + in.readFully(maxValues[i]); + } + } + + @Override + public String toString() { + final StringBuffer sb = new StringBuffer("MinMaxIndexHolder{"); + sb.append("blockletId=").append(blockletId); + sb.append(",minmaxValues={"); + for (int i = 0; i < minValues.length; i++) { + sb.append("minValue=").append(Arrays.toString(minValues[i])) + .append(", maxValue=").append(Arrays.toString(maxValues[i])); + } + sb.append("}}"); + return sb.toString(); + } +} diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/minmax/MinMaxDataMapFunctionSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/minmax/MinMaxDataMapFunctionSuite.scala new file mode 100644 index 00000000000..fc82690bb74 --- /dev/null +++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/minmax/MinMaxDataMapFunctionSuite.scala @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.minmax + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class MinMaxDataMapFunctionSuite extends QueryTest with BeforeAndAfterAll { + private val minmaxDataMapFactoryName = "org.apache.carbondata.datamap.minmax.MinMaxDataMapFactory" + + override protected def beforeAll(): Unit = { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, + "yyyy-MM-dd") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + "yyyy-MM-dd HH:mm:ss") + } + + override protected def afterAll(): Unit = { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + } + + private def createAllDataTypeTable(tableName: String): Unit = { + sql(s"DROP TABLE IF EXISTS $tableName") + sql( + s""" + | CREATE TABLE $tableName( + | booleanField boolean, + | shortField smallint, + | intField int, + | bigintField bigint, + | doubleField double, + | stringField string, + | timestampField timestamp, + | decimalField decimal(18,2), + | dateField date, + | charField string, + | floatField float, + | stringDictField string, + | stringSortField string, + | stringLocalDictField string, + | longStringField string + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'LONG_STRING_COLUMNS'='longStringField', + | 'SORT_COLUMNS'='stringSortField', + | 'DICTIONARY_INCLUDE'='stringDictField', + | 'local_dictionary_enable'='true', + | 'local_dictionary_threshold'='10000', + | 'local_dictionary_include'='stringLocalDictField', + | 'CACHE_LEVEL'='BLOCKLET') + """.stripMargin) + } + + private def loadAllDataTypeTable(tableName: String): Unit = { + sql( + s""" + | INSERT INTO TABLE $tableName VALUES + | (true,1,11,101,41.4,'string1','2015-04-23 12:01:01',12.34,'2015-04-23','aaa',1.5,'dict1','sort1','local_dict1','longstring1'), + | (false,2,12,102,42.4,'string2','2015-05-23 12:01:03',23.45,'2015-05-23','bbb',2.5,'dict2','sort2','local_dict2','longstring2'), + | (true,3,13,163,43.4,'string3','2015-07-26 12:01:06',34.56,'2015-07-26','ccc',3.5,'dict3','sort3','local_dict3','longstring3'), + | (NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL) + """.stripMargin) + sql( + s""" + | INSERT INTO TABLE $tableName VALUES + | (true,${Short.MaxValue - 2},${Int.MinValue + 2},${Long.MaxValue - 2},${Double.MinValue + 2},'string1','2015-04-23 12:01:01',${Double.MinValue + 2},'2015-04-23','aaa',${Float.MaxValue - 2},'dict1','sort1','local_dict1','longstring1'), + | (false,2,12,102,42.4,'string2','2015-05-23 12:01:03',23.45,'2015-05-23','bbb',2.5,'dict2','sort2','local_dict2','longstring2'), + | (true,${Short.MinValue + 2},${Int.MaxValue - 2},${Long.MinValue + 2},${Double.MaxValue - 2},'string3','2015-07-26 12:01:06',${Double.MinValue + 2},'2015-07-26','ccc',${Float.MinValue + 2},'dict3','sort3','local_dict3','longstring3'), + | (NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL) + """.stripMargin) + } + + test("create minmax datamap on all datatypes with direct write and late rebuild") { + val normalTableName = "normal_all_data_types" + val tableName = "minmax_all_data_types" + val dataMapName = "dm_minmax_with_all_data_types" + sql(s"DROP TABLE IF EXISTS $normalTableName") + sql(s"DROP TABLE IF EXISTS $tableName") + createAllDataTypeTable(normalTableName) + createAllDataTypeTable(tableName) + loadAllDataTypeTable(normalTableName) + loadAllDataTypeTable(tableName) + // create datamap on all supported datatype + sql( + s""" + | CREATE DATAMAP $dataMapName ON TABLE $tableName + | USING '$minmaxDataMapFactoryName' + | DMPROPERTIES( + | 'INDEX_COLUMNS'='booleanField, shortField, intField, bigintField, doubleField, stringField, timestampField, decimalField, dateField, charField, floatField, stringDictField, stringSortField, stringLocalDictField, longStringField' + | ) + """.stripMargin) + loadAllDataTypeTable(normalTableName) + loadAllDataTypeTable(tableName) + checkExistence(sql(s"SHOW DATAMAP ON TABLE $tableName"), + true, minmaxDataMapFactoryName, dataMapName) + checkAnswer(sql(s"SELECT COUNT(*) FROM $tableName"), sql(s"SELECT COUNT(*) FROM $normalTableName")) + checkAnswer(sql(s"SELECT COUNT(*) FROM (SELECT * FROM $tableName WHERE dateField='2015-05-23' AND timestampField='2015-05-23 12:01:03') b"), + sql(s"SELECT COUNT(*) FROM (SELECT * FROM $normalTableName WHERE dateField='2015-05-23' AND timestampField='2015-05-23 12:01:03') b")) + checkExistence( + sql(s"EXPLAIN SELECT * FROM $tableName WHERE booleanField=false AND shortField=2 AND intField=12 AND bigintField=102 AND doubleField=42.4 AND stringField='string2' AND timestampField='2015-05-23 12:01:03' AND decimalField=23.45 AND dateField='2015-05-23' AND charField='bbb' AND floatField=2.5 AND stringDictField='dict2' AND stringSortField='sort2' AND stringLocalDictField= 'local_dict2' AND longStringField='longstring2'"), + true, minmaxDataMapFactoryName, dataMapName) + checkAnswer( + sql(s"SELECT * FROM $tableName WHERE booleanField=false AND shortField=2 AND intField=12 AND bigintField=102 AND stringField='string2' AND timestampField='2015-05-23 12:01:03' AND decimalField=23.45 AND dateField='2015-05-23' AND charField='bbb' AND floatField=2.5 AND stringDictField='dict2' AND stringSortField='sort2' AND stringLocalDictField= 'local_dict2' AND longStringField='longstring2'"), + sql(s"SELECT * FROM $normalTableName WHERE booleanField=false AND shortField=2 AND intField=12 AND bigintField=102 AND stringField='string2' AND timestampField='2015-05-23 12:01:03' AND decimalField=23.45 AND dateField='2015-05-23' AND charField='bbb' AND floatField=2.5 AND stringDictField='dict2' AND stringSortField='sort2' AND stringLocalDictField= 'local_dict2' AND longStringField='longstring2'")) + checkAnswer( + sql(s"SELECT COUNT(*) FROM (SELECT * FROM $tableName WHERE booleanField=false AND shortField=2 AND intField=12 AND bigintField=102 AND stringField='string2' AND timestampField='2015-05-23 12:01:03' AND decimalField=23.45 AND dateField='2015-05-23' AND charField='bbb' AND floatField=2.5 AND stringDictField='dict2' AND stringSortField='sort2' AND stringLocalDictField= 'local_dict2' AND longStringField='longstring2') b"), + sql(s"SELECT COUNT(*) FROM (SELECT * FROM $normalTableName WHERE booleanField=false AND shortField=2 AND intField=12 AND bigintField=102 AND stringField='string2' AND timestampField='2015-05-23 12:01:03' AND decimalField=23.45 AND dateField='2015-05-23' AND charField='bbb' AND floatField=2.5 AND stringDictField='dict2' AND stringSortField='sort2' AND stringLocalDictField= 'local_dict2' AND longStringField='longstring2') b")) + sql(s"DROP TABLE IF EXISTS $normalTableName") + sql(s"DROP TABLE IF EXISTS $tableName") + } + + test("create minmax datamap on all datatypes with sort columns with direct write and late rebuild") { + val normalTableName = "normal_all_data_types_with_sort_column" + val tableName = "minmax_all_data_types_with_sort_column" + val dataMapName = "dm_minmax_with_all_data_types_with_sort_column" + sql(s"DROP TABLE IF EXISTS $normalTableName") + sql(s"DROP TABLE IF EXISTS $tableName") + def createAllDataTypeWithSortColumnTable(name : String) = { + sql( + s""" + | CREATE TABLE $name( + | booleanField boolean, + | shortField smallint, + | intField int, + | bigintField bigint, + | doubleField double, + | stringField string, + | timestampField timestamp, + | decimalField decimal(18,2), + | dateField date, + | charField string, + | floatField float, + | stringDictField string, + | stringSortField string, + | stringLocalDictField string, + | longStringField string + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'LONG_STRING_COLUMNS'='longStringField', + | 'DICTIONARY_INCLUDE'='stringDictField', + | 'local_dictionary_enable'='true', + | 'local_dictionary_threshold'='10000', + | 'local_dictionary_include'='stringLocalDictField', + | 'SORT_COLUMNS'='booleanField, shortField, intField, bigintField, stringField, timestampField, dateField, charField, stringDictField, stringSortField, stringLocalDictField') + """.stripMargin) + } + // double/float/decimal/longstring cannot be sort_columns so we ignore them in SORT_COLUMNS + createAllDataTypeWithSortColumnTable(normalTableName) + createAllDataTypeWithSortColumnTable(tableName) + loadAllDataTypeTable(normalTableName) + loadAllDataTypeTable(tableName) + // create datamap on all supported datatype + sql( + s""" + | CREATE DATAMAP $dataMapName ON TABLE $tableName + | USING '$minmaxDataMapFactoryName' + | DMPROPERTIES( + | 'INDEX_COLUMNS'='booleanField, shortField, intField, bigintField, doubleField, stringField, timestampField, decimalField, dateField, charField, floatField, stringDictField, stringSortField, stringLocalDictField, longStringField' + | ) + """.stripMargin) + loadAllDataTypeTable(normalTableName) + loadAllDataTypeTable(tableName) + checkExistence(sql(s"SHOW DATAMAP ON TABLE $tableName"), + true, minmaxDataMapFactoryName, dataMapName) + checkAnswer(sql(s"SELECT COUNT(*) FROM $tableName"), sql(s"SELECT COUNT(*) FROM $normalTableName")) + checkAnswer(sql(s"SELECT COUNT(*) FROM (SELECT * FROM $tableName WHERE dateField='2015-05-23' AND timestampField='2015-05-23 12:01:03') b"), + sql(s"SELECT COUNT(*) FROM (SELECT * FROM $normalTableName WHERE dateField='2015-05-23' AND timestampField='2015-05-23 12:01:03') b")) + checkExistence( + sql(s"EXPLAIN SELECT * FROM $tableName WHERE booleanField=false AND shortField=2 AND intField=12 AND bigintField=102 AND doubleField=42.4 AND stringField='string2' AND timestampField='2015-05-23 12:01:03' AND decimalField=23.45 AND dateField='2015-05-23' AND charField='bbb' AND floatField=2.5 AND stringDictField='dict2' AND stringSortField='sort2' AND stringLocalDictField= 'local_dict2' AND longStringField='longstring2'"), + true, minmaxDataMapFactoryName, dataMapName) + checkAnswer( + sql(s"SELECT * FROM $tableName WHERE booleanField=false AND shortField=2 AND intField=12 AND bigintField=102 AND stringField='string2' AND timestampField='2015-05-23 12:01:03' AND decimalField=23.45 AND dateField='2015-05-23' AND charField='bbb' AND floatField=2.5 AND stringDictField='dict2' AND stringSortField='sort2' AND stringLocalDictField= 'local_dict2' AND longStringField='longstring2'"), + sql(s"SELECT * FROM $normalTableName WHERE booleanField=false AND shortField=2 AND intField=12 AND bigintField=102 AND stringField='string2' AND timestampField='2015-05-23 12:01:03' AND decimalField=23.45 AND dateField='2015-05-23' AND charField='bbb' AND floatField=2.5 AND stringDictField='dict2' AND stringSortField='sort2' AND stringLocalDictField= 'local_dict2' AND longStringField='longstring2'")) + checkAnswer( + sql(s"SELECT COUNT(*) FROM (SELECT * FROM $tableName WHERE booleanField=false AND shortField=2 AND intField=12 AND bigintField=102 AND stringField='string2' AND timestampField='2015-05-23 12:01:03' AND decimalField=23.45 AND dateField='2015-05-23' AND charField='bbb' AND floatField=2.5 AND stringDictField='dict2' AND stringSortField='sort2' AND stringLocalDictField= 'local_dict2' AND longStringField='longstring2') b"), + sql(s"SELECT COUNT(*) FROM (SELECT * FROM $normalTableName WHERE booleanField=false AND shortField=2 AND intField=12 AND bigintField=102 AND stringField='string2' AND timestampField='2015-05-23 12:01:03' AND decimalField=23.45 AND dateField='2015-05-23' AND charField='bbb' AND floatField=2.5 AND stringDictField='dict2' AND stringSortField='sort2' AND stringLocalDictField= 'local_dict2' AND longStringField='longstring2') b")) + + sql(s"DROP TABLE IF EXISTS $normalTableName") + sql(s"DROP TABLE IF EXISTS $tableName") + } + + test("test minmax datamap with empty values on index column") { + val normalTable = "minmax_normal_table" + val bloomDMSampleTable = "minmax_bloom_table" + val dataMapName = "minmax_dm_datamap" + sql(s"DROP TABLE IF EXISTS $normalTable") + sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable") + sql(s"CREATE TABLE $normalTable(c1 string, c2 int, c3 string) STORED BY 'carbondata'") + sql(s"CREATE TABLE $bloomDMSampleTable(c1 string, c2 int, c3 string) STORED BY 'carbondata'") + // load data with empty value + sql(s"INSERT INTO $normalTable SELECT '', 1, 'xxx'") + sql(s"INSERT INTO $bloomDMSampleTable SELECT '', 1, 'xxx'") + sql(s"INSERT INTO $normalTable SELECT '', null, 'xxx'") + sql(s"INSERT INTO $bloomDMSampleTable SELECT '', null, 'xxx'") + sql( + s""" + | CREATE DATAMAP $dataMapName on table $bloomDMSampleTable + | using '$minmaxDataMapFactoryName' + | DMPROPERTIES('index_columns'='c1, c2') + """.stripMargin) + // load data with empty value + sql(s"INSERT INTO $normalTable SELECT '', 1, 'xxx'") + sql(s"INSERT INTO $bloomDMSampleTable SELECT '', 1, 'xxx'") + sql(s"INSERT INTO $normalTable SELECT '', null, 'xxx'") + sql(s"INSERT INTO $bloomDMSampleTable SELECT '', null, 'xxx'") + + // query on null fields + checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable"), + sql(s"SELECT * FROM $normalTable")) + checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE c1 = null"), + sql(s"SELECT * FROM $normalTable WHERE c1 = null")) + checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE c1 = ''"), + sql(s"SELECT * FROM $normalTable WHERE c1 = ''")) + checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE isNull(c1)"), + sql(s"SELECT * FROM $normalTable WHERE isNull(c1)")) + checkAnswer(sql(s"SELECT * FROM $bloomDMSampleTable WHERE isNull(c2)"), + sql(s"SELECT * FROM $normalTable WHERE isNull(c2)")) + + sql(s"DROP TABLE IF EXISTS $normalTable") + sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable") + } + + test("create multiple datamaps vs create on datamap on multiple columns") { + val tableName1 = "minmax_all_data_types10" + val tableName2 = "minmax_all_data_types20" + val dataMapName1 = "dm_minmax_with_all_data_types10" + val dataMapName2Prefix = "dm_minmax_with_all_data_types2" + sql(s"DROP TABLE IF EXISTS $tableName1") + sql(s"DROP TABLE IF EXISTS $tableName2") + + createAllDataTypeTable(tableName1) + createAllDataTypeTable(tableName2) + loadAllDataTypeTable(tableName1) + loadAllDataTypeTable(tableName2) + // create one datamap on multiple index columns + sql( + s""" + | CREATE DATAMAP $dataMapName1 ON TABLE $tableName1 + | USING '$minmaxDataMapFactoryName' + | DMPROPERTIES( + | 'INDEX_COLUMNS'='booleanField, shortField, intField, bigintField, doubleField, stringField, timestampField, decimalField, dateField, charField, floatField, stringDictField, stringSortField, stringLocalDictField, longStringField' + | ) + """.stripMargin) + // create multiple datamaps each on one index column + sql( + s""" + | CREATE DATAMAP ${dataMapName2Prefix}0 ON TABLE $tableName2 + | USING '$minmaxDataMapFactoryName' + | DMPROPERTIES( + | 'INDEX_COLUMNS'='booleanField' + | ) + """.stripMargin) + sql( + s""" + | CREATE DATAMAP ${dataMapName2Prefix}1 ON TABLE $tableName2 + | USING '$minmaxDataMapFactoryName' + | DMPROPERTIES( + | 'INDEX_COLUMNS'='shortField' + | ) + """.stripMargin) + sql( + s""" + | CREATE DATAMAP ${dataMapName2Prefix}2 ON TABLE $tableName2 + | USING '$minmaxDataMapFactoryName' + | DMPROPERTIES( + | 'INDEX_COLUMNS'='intField' + | ) + """.stripMargin) + sql( + s""" + | CREATE DATAMAP ${dataMapName2Prefix}3 ON TABLE $tableName2 + | USING '$minmaxDataMapFactoryName' + | DMPROPERTIES( + | 'INDEX_COLUMNS'='bigintField' + | ) + """.stripMargin) + sql( + s""" + | CREATE DATAMAP ${dataMapName2Prefix}4 ON TABLE $tableName2 + | USING '$minmaxDataMapFactoryName' + | DMPROPERTIES( + | 'INDEX_COLUMNS'='doubleField' + | ) + """.stripMargin) + sql( + s""" + | CREATE DATAMAP ${dataMapName2Prefix}5 ON TABLE $tableName2 + | USING '$minmaxDataMapFactoryName' + | DMPROPERTIES( + | 'INDEX_COLUMNS'='stringField' + | ) + """.stripMargin) + sql( + s""" + | CREATE DATAMAP ${dataMapName2Prefix}6 ON TABLE $tableName2 + | USING '$minmaxDataMapFactoryName' + | DMPROPERTIES( + | 'INDEX_COLUMNS'='timestampField' + | ) + """.stripMargin) + sql( + s""" + | CREATE DATAMAP ${dataMapName2Prefix}7 ON TABLE $tableName2 + | USING '$minmaxDataMapFactoryName' + | DMPROPERTIES( + | 'INDEX_COLUMNS'='decimalField' + | ) + """.stripMargin) + sql( + s""" + | CREATE DATAMAP ${dataMapName2Prefix}8 ON TABLE $tableName2 + | USING '$minmaxDataMapFactoryName' + | DMPROPERTIES( + | 'INDEX_COLUMNS'='dateField' + | ) + """.stripMargin) + sql( + s""" + | CREATE DATAMAP ${dataMapName2Prefix}9 ON TABLE $tableName2 + | USING '$minmaxDataMapFactoryName' + | DMPROPERTIES( + | 'INDEX_COLUMNS'='charField' + | ) + """.stripMargin) + sql( + s""" + | CREATE DATAMAP ${dataMapName2Prefix}10 ON TABLE $tableName2 + | USING '$minmaxDataMapFactoryName' + | DMPROPERTIES( + | 'INDEX_COLUMNS'='floatField' + | ) + """.stripMargin) + sql( + s""" + | CREATE DATAMAP ${dataMapName2Prefix}11 ON TABLE $tableName2 + | USING '$minmaxDataMapFactoryName' + | DMPROPERTIES( + | 'INDEX_COLUMNS'='stringDictField' + | ) + """.stripMargin) + sql( + s""" + | CREATE DATAMAP ${dataMapName2Prefix}12 ON TABLE $tableName2 + | USING '$minmaxDataMapFactoryName' + | DMPROPERTIES( + | 'INDEX_COLUMNS'='stringSortField' + | ) + """.stripMargin) + sql( + s""" + | CREATE DATAMAP ${dataMapName2Prefix}13 ON TABLE $tableName2 + | USING '$minmaxDataMapFactoryName' + | DMPROPERTIES( + | 'INDEX_COLUMNS'='stringLocalDictField' + | ) + """.stripMargin) + sql( + s""" + | CREATE DATAMAP ${dataMapName2Prefix}14 ON TABLE $tableName2 + | USING '$minmaxDataMapFactoryName' + | DMPROPERTIES( + | 'INDEX_COLUMNS'='longStringField' + | ) + """.stripMargin) + loadAllDataTypeTable(tableName1) + loadAllDataTypeTable(tableName2) + assert(sql(s"SHOW DATAMAP ON TABLE $tableName1").collect().length == 1) + assert(sql(s"SHOW DATAMAP ON TABLE $tableName2").collect().length == 15) + checkAnswer(sql(s"SELECT COUNT(*) FROM $tableName1"), sql(s"SELECT COUNT(*) FROM $tableName2")) + checkExistence( + sql(s"EXPLAIN SELECT * FROM $tableName1 WHERE booleanField=false AND shortField=2 AND intField=12 AND bigintField=102 AND doubleField=42.4 AND stringField='string2' AND timestampField='2015-05-23 12:01:03' AND decimalField=23.45 AND dateField='2015-05-23' AND charField='bbb' AND floatField=2.5 AND stringDictField='dict2' AND stringSortField='sort2' AND stringLocalDictField= 'local_dict2' AND longStringField='longstring2'"), + true, minmaxDataMapFactoryName, dataMapName1) + val allDataMapsOnTable2 = (0 to 14).map(p => s"$dataMapName2Prefix$p") + val existedString = (allDataMapsOnTable2 :+ minmaxDataMapFactoryName).toArray + checkExistence( + sql(s"EXPLAIN SELECT * FROM $tableName2 WHERE booleanField=false AND shortField=2 AND intField=12 AND bigintField=102 AND doubleField=42.4 AND stringField='string2' AND timestampField='2015-05-23 12:01:03' AND decimalField=23.45 AND dateField='2015-05-23' AND charField='bbb' AND floatField=2.5 AND stringDictField='dict2' AND stringSortField='sort2' AND stringLocalDictField= 'local_dict2' AND longStringField='longstring2'"), + true, existedString: _*) + checkAnswer( + sql(s"SELECT * FROM $tableName1 WHERE booleanField=false AND shortField=2 AND intField=12 AND bigintField=102 AND stringField='string2' AND timestampField='2015-05-23 12:01:03' AND decimalField=23.45 AND dateField='2015-05-23' AND charField='bbb' AND floatField=2.5 AND stringDictField='dict2' AND stringSortField='sort2' AND stringLocalDictField= 'local_dict2' AND longStringField='longstring2'"), + sql(s"SELECT * FROM $tableName2 WHERE booleanField=false AND shortField=2 AND intField=12 AND bigintField=102 AND stringField='string2' AND timestampField='2015-05-23 12:01:03' AND decimalField=23.45 AND dateField='2015-05-23' AND charField='bbb' AND floatField=2.5 AND stringDictField='dict2' AND stringSortField='sort2' AND stringLocalDictField= 'local_dict2' AND longStringField='longstring2'")) + + sql(s"DROP TABLE IF EXISTS $tableName1") + sql(s"DROP TABLE IF EXISTS $tableName2") + } +} diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/minmax/MinMaxDataMapSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/minmax/MinMaxDataMapSuite.scala index d163d03b40c..a69189236e7 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/minmax/MinMaxDataMapSuite.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/minmax/MinMaxDataMapSuite.scala @@ -24,8 +24,6 @@ import scala.util.Random import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll -import org.apache.carbondata.datamap.examples.MinMaxIndexDataMapFactory - class MinMaxDataMapSuite extends QueryTest with BeforeAndAfterAll { val inputFile = s"$resourcesPath/minmax_datamap_input.csv" val normalTable = "carbonNormal" @@ -55,7 +53,8 @@ class MinMaxDataMapSuite extends QueryTest with BeforeAndAfterAll { sql( s""" | CREATE DATAMAP $dataMapName ON TABLE $minMaxDMSampleTable - | USING '${classOf[MinMaxIndexDataMapFactory].getName}' + | USING '${classOf[MinMaxDataMapFactory].getName}' + | DMPROPERTIES('INDEX_COLUMNS'='id, city') """.stripMargin) sql( @@ -69,10 +68,9 @@ class MinMaxDataMapSuite extends QueryTest with BeforeAndAfterAll { | OPTIONS('header'='false') """.stripMargin) - sql(s"show datamap on table $minMaxDMSampleTable").show(false) + checkExistence(sql(s"show datamap on table $minMaxDMSampleTable"), true, dataMapName) // not that the table will use default dimension as sort_columns, so for the following cases, // the pruning result will differ. - // 1 blocklet checkAnswer(sql(s"select * from $minMaxDMSampleTable where id = 1"), sql(s"select * from $normalTable where id = 1")) // 6 blocklet