From 01d198fb9f041fc0375eeb3b25e63368ed6eadb7 Mon Sep 17 00:00:00 2001 From: sounakr Date: Thu, 28 Sep 2017 16:21:05 +0530 Subject: [PATCH] Min Max Example --- .../core/datamap/DataMapStoreManager.java | 16 +- .../carbondata/core/datamap/TableDataMap.java | 17 +- .../carbondata/core/datamap/dev/DataMap.java | 3 +- .../core/datamap/dev/DataMapWriter.java | 3 +- .../indexstore/SegmentPropertiesFetcher.java | 36 +++ .../blockletindex/BlockletDataMap.java | 2 +- .../blockletindex/BlockletDataMapFactory.java | 33 ++- datamap/examples/pom.xml | 111 +++++++++ .../datamap/examples/BlockletMinMax.java | 41 ++++ .../datamap/examples/MinMaxDataMap.java | 143 ++++++++++++ .../examples/MinMaxDataMapFactory.java | 114 +++++++++ .../datamap/examples/MinMaxDataWriter.java | 221 ++++++++++++++++++ .../examples/MinMaxIndexBlockDetails.java | 77 ++++++ .../MinMaxDataMapExample.scala | 77 ++++++ .../datamap/DataMapWriterSuite.scala | 2 +- pom.xml | 2 + .../datamap/DataMapWriterListener.java | 4 +- .../store/writer/AbstractFactDataWriter.java | 7 +- .../writer/v3/CarbonFactDataWriterImplV3.java | 3 + 19 files changed, 893 insertions(+), 19 deletions(-) create mode 100644 core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java create mode 100644 datamap/examples/pom.xml create mode 100644 datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/BlockletMinMax.java create mode 100644 datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java create mode 100644 datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java create mode 100644 datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java create mode 100644 datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java create mode 100644 datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java index 2b5d5cd370b..3851d190645 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java @@ -26,6 +26,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datamap.dev.DataMapFactory; import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher; +import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher; import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap; import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; @@ -103,7 +104,7 @@ public TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier, tableDataMaps = new ArrayList<>(); } TableDataMap dataMap = getTableDataMap(dataMapName, tableDataMaps); - if (dataMap != null) { + if (dataMap != null && dataMap.getDataMapName().equalsIgnoreCase(dataMapName)) { throw new RuntimeException("Already datamap exists in that path with type " + dataMapName); } @@ -113,12 +114,15 @@ public TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier, DataMapFactory dataMapFactory = factoryClass.newInstance(); dataMapFactory.init(identifier, dataMapName); BlockletDetailsFetcher blockletDetailsFetcher; + SegmentPropertiesFetcher segmentPropertiesFetcher = null; if (dataMapFactory instanceof BlockletDetailsFetcher) { blockletDetailsFetcher = (BlockletDetailsFetcher) dataMapFactory; } else { blockletDetailsFetcher = getBlockletDetailsFetcher(identifier); } - dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory, blockletDetailsFetcher); + segmentPropertiesFetcher = (SegmentPropertiesFetcher) blockletDetailsFetcher; + dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory, blockletDetailsFetcher, + segmentPropertiesFetcher); } catch (Exception e) { LOGGER.error(e); throw new RuntimeException(e); @@ -128,11 +132,11 @@ public TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier, return dataMap; } - private TableDataMap getTableDataMap(String dataMapName, - List tableDataMaps) { + private TableDataMap getTableDataMap(String dataMapName, List tableDataMaps) { TableDataMap dataMap = null; - for (TableDataMap tableDataMap: tableDataMaps) { - if (tableDataMap.getDataMapName().equals(dataMapName)) { + for (TableDataMap tableDataMap : tableDataMaps) { + if (tableDataMap.getDataMapName().equals(dataMapName) || (!tableDataMap.getDataMapName() + .equals(""))) { dataMap = tableDataMap; break; } diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java index 3e5e9e42c2b..d374ce49546 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java @@ -22,11 +22,13 @@ import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.datamap.dev.DataMapFactory; +import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.events.ChangeEvent; import org.apache.carbondata.core.events.EventListener; import org.apache.carbondata.core.indexstore.Blocklet; import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; @@ -44,15 +46,19 @@ public final class TableDataMap implements EventListener { private BlockletDetailsFetcher blockletDetailsFetcher; + private SegmentPropertiesFetcher segmentPropertiesFetcher; + /** * It is called to initialize and load the required table datamap metadata. */ public TableDataMap(AbsoluteTableIdentifier identifier, String dataMapName, - DataMapFactory dataMapFactory, BlockletDetailsFetcher blockletDetailsFetcher) { + DataMapFactory dataMapFactory, BlockletDetailsFetcher blockletDetailsFetcher, + SegmentPropertiesFetcher segmentPropertiesFetcher) { this.identifier = identifier; this.dataMapName = dataMapName; this.dataMapFactory = dataMapFactory; this.blockletDetailsFetcher = blockletDetailsFetcher; + this.segmentPropertiesFetcher = segmentPropertiesFetcher; } /** @@ -65,11 +71,13 @@ public TableDataMap(AbsoluteTableIdentifier identifier, String dataMapName, public List prune(List segmentIds, FilterResolverIntf filterExp) throws IOException { List blocklets = new ArrayList<>(); + SegmentProperties segmentProperties; for (String segmentId : segmentIds) { List pruneBlocklets = new ArrayList<>(); List dataMaps = dataMapFactory.getDataMaps(segmentId); + segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segmentId); for (DataMap dataMap : dataMaps) { - pruneBlocklets.addAll(dataMap.prune(filterExp)); + pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties)); } blocklets.addAll(addSegmentId(blockletDetailsFetcher .getExtendedBlocklets(pruneBlocklets, segmentId), segmentId)); @@ -118,8 +126,9 @@ public List toDistributable(List segmentIds) throw public List prune(DataMapDistributable distributable, FilterResolverIntf filterExp) throws IOException { List detailedBlocklets = new ArrayList<>(); - List blocklets = dataMapFactory.getDataMap(distributable).prune(filterExp); - for (Blocklet blocklet: blocklets) { + List blocklets = dataMapFactory.getDataMap(distributable).prune(filterExp, + segmentPropertiesFetcher.getSegmentProperties(distributable.getSegmentId())); + for (Blocklet blocklet : blocklets) { ExtendedBlocklet detailedBlocklet = blockletDetailsFetcher.getExtendedBlocklet(blocklet, distributable.getSegmentId()); detailedBlocklet.setSegmentId(distributable.getSegmentId()); diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java index f6ea885dca4..233660e4f3e 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.List; +import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.indexstore.Blocklet; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; @@ -40,7 +41,7 @@ public interface DataMap { * @param filterExp * @return */ - List prune(FilterResolverIntf filterExp); + List prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties); /** diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java index 28163d78614..413eaa5965d 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java @@ -27,7 +27,7 @@ public interface DataMapWriter { * Start of new block notification. * @param blockId file name of the carbondata file */ - void onBlockStart(String blockId); + void onBlockStart(String blockId, String blockPath); /** * End of block notification @@ -45,7 +45,6 @@ public interface DataMapWriter { * @param blockletId sequence number of blocklet in the block */ void onBlockletEnd(int blockletId); - /** * Add the column pages row to the datamap, order of pages is same as `indexColumns` in * DataMapMeta returned in DataMapFactory. diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java new file mode 100644 index 00000000000..ec2ae93f084 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.indexstore; + +import java.io.IOException; + +import org.apache.carbondata.core.datastore.block.SegmentProperties; + +/** + * Fetches the detailed segmentProperties which has more information to execute the query + */ +public interface SegmentPropertiesFetcher { + + /** + * get the Segment properties based on the SegmentID. + * @param segmentId + * @return + * @throws IOException + */ + SegmentProperties getSegmentProperties(String segmentId) throws IOException; +} diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java index 0d7bb71428a..6d75652587f 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java @@ -306,7 +306,7 @@ private void createSchema(SegmentProperties segmentProperties) throws MemoryExce } @Override - public List prune(FilterResolverIntf filterExp) { + public List prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties) { // getting the start and end index key based on filter for hitting the // selected block reference nodes based on filter resolver tree. diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index 5edc5b71e3d..8a8bf75ee93 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -30,6 +30,7 @@ import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.datamap.dev.DataMapFactory; import org.apache.carbondata.core.datamap.dev.DataMapWriter; +import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; @@ -37,8 +38,12 @@ import org.apache.carbondata.core.indexstore.Blocklet; import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher; import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.util.DataFileFooterConverter; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.hadoop.fs.FileSystem; @@ -49,13 +54,17 @@ /** * Table map for blocklet */ -public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFetcher { +public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFetcher, + SegmentPropertiesFetcher { private AbsoluteTableIdentifier identifier; // segmentId -> list of index file private Map> segmentMap = new HashMap<>(); + // segmentId -> SegmentProperties. + private Map segmentPropertiesMap = new HashMap<>(); + private Cache cache; @Override @@ -175,6 +184,7 @@ public List toDistributable(String segmentId) { @Override public void clear(String segmentId) { + segmentPropertiesMap.remove(segmentId); List blockIndexes = segmentMap.remove(segmentId); if (blockIndexes != null) { for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) { @@ -219,4 +229,25 @@ public DataMapMeta getMeta() { // TODO: pass SORT_COLUMNS into this class return null; } + + @Override public SegmentProperties getSegmentProperties(String segmentId) throws IOException { + SegmentProperties segmentProperties = segmentPropertiesMap.get(segmentId); + if (segmentProperties == null) { + int[] columnCardinality; + List tableBlockIndexUniqueIdentifiers = + getTableBlockIndexUniqueIdentifiers(segmentId); + DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); + List indexInfo = + fileFooterConverter.getIndexInfo(tableBlockIndexUniqueIdentifiers.get(0).getFilePath()); + for (DataFileFooter fileFooter : indexInfo) { + List columnInTable = fileFooter.getColumnInTable(); + if (segmentProperties == null) { + columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality(); + segmentProperties = new SegmentProperties(columnInTable, columnCardinality); + } + } + segmentPropertiesMap.put(segmentId, segmentProperties); + } + return segmentProperties; + } } diff --git a/datamap/examples/pom.xml b/datamap/examples/pom.xml new file mode 100644 index 00000000000..6832e62667d --- /dev/null +++ b/datamap/examples/pom.xml @@ -0,0 +1,111 @@ + + + + + 4.0.0 + + + org.apache.carbondata + carbondata-parent + 1.3.0-SNAPSHOT + ../../pom.xml + + + carbondata-datamap-examples + Apache CarbonData :: Datamap Examples + + + ${basedir}/../../dev + + + + + org.apache.carbondata + carbondata-spark2 + ${project.version} + + + org.apache.spark + spark-hive-thriftserver_2.10 + + + org.apache.spark + spark-repl_2.10 + + + org.apache.spark + spark-sql_2.10 + + + + + org.apache.spark + spark-sql_${scala.binary.version} + + + org.apache.spark + spark-hive-thriftserver_${scala.binary.version} + + + org.apache.spark + spark-repl_${scala.binary.version} + + + + + src/minmaxdatamap/main/java + + + . + + CARBON_EXAMPLESLogResource.properties + + + + + + org.scala-tools + maven-scala-plugin + 2.15.2 + + + compile + + compile + + compile + + + process-resources + + compile + + + + + + maven-compiler-plugin + + 1.7 + 1.7 + + + + + + \ No newline at end of file diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/BlockletMinMax.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/BlockletMinMax.java new file mode 100644 index 00000000000..e6968fe32ef --- /dev/null +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/BlockletMinMax.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.examples; + + +public class BlockletMinMax { + private byte[][] Min; + + private byte[][] Max; + + public byte[][] getMin() { + return Min; + } + + public void setMin(byte[][] min) { + Min = min; + } + + public byte[][] getMax() { + return Max; + } + + public void setMax(byte[][] max) { + Max = max; + } +} diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java new file mode 100644 index 00000000000..2ad63276d54 --- /dev/null +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.examples; + +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.dev.DataMap; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.core.indexstore.Blocklet; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.util.CarbonUtil; + +import com.google.gson.Gson; + +/** + * Datamap implementation for min max blocklet. + */ +public class MinMaxDataMap implements DataMap { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(MinMaxDataMap.class.getName()); + + public static final String NAME = "clustered.minmax.btree.blocklet"; + + private String filePath; + + private MinMaxIndexBlockDetails[] readMinMaxDataMap; + + @Override public void init(String filePath) throws MemoryException, IOException { + this.filePath = filePath; + CarbonFile[] listFiles = getCarbonMinMaxIndexFiles(filePath, "0"); + for (int i = 0; i < listFiles.length; i++) { + readMinMaxDataMap = readJson(listFiles[i].getPath()); + } + } + + private CarbonFile[] getCarbonMinMaxIndexFiles(String filePath, String segmentId) { + String path = filePath.substring(0, filePath.lastIndexOf("/") + 1); + CarbonFile carbonFile = FileFactory.getCarbonFile(path); + return carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().endsWith(".minmaxindex"); + } + }); + } + + public MinMaxIndexBlockDetails[] readJson(String filePath) throws IOException { + Gson gsonObjectToRead = new Gson(); + DataInputStream dataInputStream = null; + BufferedReader buffReader = null; + InputStreamReader inStream = null; + MinMaxIndexBlockDetails[] readMinMax = null; + AtomicFileOperations fileOperation = + new AtomicFileOperationsImpl(filePath, FileFactory.getFileType(filePath)); + + try { + if (!FileFactory.isFileExist(filePath, FileFactory.getFileType(filePath))) { + return null; + } + dataInputStream = fileOperation.openForRead(); + inStream = new InputStreamReader(dataInputStream, + CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT); + buffReader = new BufferedReader(inStream); + readMinMax = gsonObjectToRead.fromJson(buffReader, MinMaxIndexBlockDetails[].class); + } catch (IOException e) { + return null; + } finally { + CarbonUtil.closeStreams(buffReader, inStream, dataInputStream); + } + return readMinMax; + } + + /** + * Block Prunning logic for Min Max DataMap. + * + * @param filterExp + * @param segmentProperties + * @return + */ + @Override public List prune(FilterResolverIntf filterExp, + SegmentProperties segmentProperties) { + List blocklets = new ArrayList<>(); + + if (filterExp == null) { + for (int i = 0; i < readMinMaxDataMap.length; i++) { + blocklets.add(new Blocklet(readMinMaxDataMap[i].getFilePath(), + String.valueOf(readMinMaxDataMap[i].getBlockletId()))); + } + } else { + FilterExecuter filterExecuter = + FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null); + int startIndex = 0; + while (startIndex < readMinMaxDataMap.length) { + BitSet bitSet = filterExecuter.isScanRequired(readMinMaxDataMap[startIndex].getMaxValues(), + readMinMaxDataMap[startIndex].getMinValues()); + if (!bitSet.isEmpty()) { + blocklets.add(new Blocklet(readMinMaxDataMap[startIndex].getFilePath(), + String.valueOf(readMinMaxDataMap[startIndex].getBlockletId()))); + } + startIndex++; + } + } + return blocklets; + } + + @Override + public void clear() { + readMinMaxDataMap = null; + } + +} \ No newline at end of file diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java new file mode 100644 index 00000000000..b196d0d7373 --- /dev/null +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.examples; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.datamap.DataMapMeta; +import org.apache.carbondata.core.datamap.dev.DataMap; +import org.apache.carbondata.core.datamap.dev.DataMapFactory; +import org.apache.carbondata.core.datamap.dev.DataMapWriter; +import org.apache.carbondata.core.events.ChangeEvent; +import org.apache.carbondata.core.indexstore.schema.FilterType; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; + + +/** + * Min Max DataMap Factory + */ +public class MinMaxDataMapFactory implements DataMapFactory { + + private AbsoluteTableIdentifier identifier; + + @Override + public void init(AbsoluteTableIdentifier identifier, String dataMapName) { + this.identifier = identifier; + } + + /** + * createWriter will return the MinMaxDataWriter. + * @param segmentId + * @return + */ + @Override + public DataMapWriter createWriter(String segmentId) { + return new MinMaxDataWriter(); + } + + /** + * getDataMaps Factory method Initializes the Min Max Data Map and returns. + * @param segmentId + * @return + * @throws IOException + */ + @Override public List getDataMaps(String segmentId) throws IOException { + List dataMapList = new ArrayList<>(); + // Form a dataMap of Type MinMaxDataMap. + MinMaxDataMap dataMap = new MinMaxDataMap(); + try { + dataMap.init(identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + File.separator); + } catch (MemoryException ex) { + + } + dataMapList.add(dataMap); + return dataMapList; + } + + /** + * + * @param segmentId + * @return + */ + @Override public List toDistributable(String segmentId) { + return null; + } + + /** + * Clear the DataMap. + * @param segmentId + */ + @Override public void clear(String segmentId) { + } + + /** + * Clearing the data map. + */ + @Override + public void clear() { + } + + @Override public DataMap getDataMap(DataMapDistributable distributable) { + return null; + } + + @Override + public void fireEvent(ChangeEvent event) { + + } + + @Override + public DataMapMeta getMeta() { + return new DataMapMeta(new ArrayList(Arrays.asList("c2")), FilterType.EQUALTO); + } +} \ No newline at end of file diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java new file mode 100644 index 00000000000..78544d3ad7e --- /dev/null +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.examples; + +import java.io.BufferedWriter; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.dev.DataMapWriter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.util.ByteUtil; +import org.apache.carbondata.core.util.CarbonUtil; + +import com.google.gson.Gson; + +public class MinMaxDataWriter implements DataMapWriter { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(TableInfo.class.getName()); + + private byte[][] pageLevelMin, pageLevelMax; + + private byte[][] blockletLevelMin, blockletLevelMax; + + private Map blockMinMaxMap; + + private String blockPath; + + + @Override public void onBlockStart(String blockId, String blockPath) { + pageLevelMax = null; + pageLevelMin = null; + blockletLevelMax = null; + blockletLevelMin = null; + blockMinMaxMap = null; + blockMinMaxMap = new HashMap(); + this.blockPath = blockPath; + } + + @Override public void onBlockEnd(String blockId) { + updateMinMaxIndex(blockId); + } + + @Override public void onBlockletStart(int blockletId) { + } + + @Override public void onBlockletEnd(int blockletId) { + updateBlockletMinMax(blockletId); + } + + @Override + public void onPageAdded(int blockletId, int pageId, ColumnPage[] pages) { + // Calculate Min and Max value within this page. + + // As part of example we are extracting Min Max values Manually. The same can be done from + // retrieving the page statistics. For e.g. + + // if (pageLevelMin == null && pageLevelMax == null) { + // pageLevelMin[1] = CarbonUtil.getValueAsBytes(pages[0].getStatistics().getDataType(), + // pages[0].getStatistics().getMin()); + // pageLevelMax[1] = CarbonUtil.getValueAsBytes(pages[0].getStatistics().getDataType(), + // pages[0].getStatistics().getMax()); + // } else { + // if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(pageLevelMin[1], CarbonUtil + // .getValueAsBytes(pages[0].getStatistics().getDataType(), + // pages[0].getStatistics().getMin())) > 0) { + // pageLevelMin[1] = CarbonUtil.getValueAsBytes(pages[0].getStatistics().getDataType(), + // pages[0].getStatistics().getMin()); + // } + // if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(pageLevelMax[1], CarbonUtil + // .getValueAsBytes(pages[0].getStatistics().getDataType(), + // pages[0].getStatistics().getMax())) < 0) { + // pageLevelMax[1] = CarbonUtil.getValueAsBytes(pages[0].getStatistics().getDataType(), + // pages[0].getStatistics().getMax()); + // } + + byte[] value = new byte[pages[0].getBytes(0).length - 2]; + if (pageLevelMin == null && pageLevelMax == null) { + pageLevelMin = new byte[2][]; + pageLevelMax = new byte[2][]; + + System.arraycopy(pages[0].getBytes(0), 2, value, 0, value.length); + pageLevelMin[1] = value; + pageLevelMax[1] = value; + + } else { + for (int rowIndex = 0; rowIndex < pages[0].getPageSize(); rowIndex++) { + System.arraycopy(pages[0].getBytes(rowIndex), 2, value, 0, value.length); + if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(pageLevelMin[1], value) > 0) { + pageLevelMin[1] = value; + } + if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(pageLevelMax[1], value) < 0) { + pageLevelMax[1] = value; + } + } + } + } + + private void updateBlockletMinMax(int blockletId) { + if (blockletLevelMax == null || blockletLevelMin == null) { + blockletLevelMax = new byte[2][]; + blockletLevelMin = new byte[2][]; + if (pageLevelMax != null || pageLevelMin != null) { + blockletLevelMin = pageLevelMin; + blockletLevelMax = pageLevelMax; + } + } else { + if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(blockletLevelMin[1], pageLevelMin[1]) > 0) { + blockletLevelMin = pageLevelMin; + } + + if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(blockletLevelMax[1], pageLevelMax[1]) > 0) { + blockletLevelMax = pageLevelMax; + } + } + BlockletMinMax blockletMinMax = new BlockletMinMax(); + blockletMinMax.setMax(blockletLevelMax); + blockletMinMax.setMin(blockletLevelMin); + blockMinMaxMap.put(blockletId, blockletMinMax); + } + + + public void updateMinMaxIndex(String blockId) { + constructMinMaxIndex(blockId); + } + + + + /** + * Construct the Min Max Index. + * @param blockId + */ + public void constructMinMaxIndex(String blockId) { + // construct Min and Max values of each Blocklets present inside a block. + List tempMinMaxIndexBlockDetails = null; + tempMinMaxIndexBlockDetails = loadBlockDetails(); + try { + writeMinMaxIndexFile(tempMinMaxIndexBlockDetails, blockPath, blockId); + } catch (IOException ex) { + LOGGER.info(" Unable to write the file"); + } + } + + /** + * loadBlockDetails into the MinMaxIndexBlockDetails class. + */ + private List loadBlockDetails() { + List minMaxIndexBlockDetails = new ArrayList(); + MinMaxIndexBlockDetails tmpminMaxIndexBlockDetails = new MinMaxIndexBlockDetails(); + + for (int index = 0; index < blockMinMaxMap.size(); index++) { + tmpminMaxIndexBlockDetails.setMinValues(blockMinMaxMap.get(index).getMin()); + tmpminMaxIndexBlockDetails.setMaxValues(blockMinMaxMap.get(index).getMax()); + tmpminMaxIndexBlockDetails.setBlockletId(index); + tmpminMaxIndexBlockDetails.setFilePath(this.blockPath); + minMaxIndexBlockDetails.add(tmpminMaxIndexBlockDetails); + } + return minMaxIndexBlockDetails; + } + + /** + * Write the data to a file. This is JSON format file. + * @param minMaxIndexBlockDetails + * @param blockPath + * @param blockId + * @throws IOException + */ + public void writeMinMaxIndexFile(List minMaxIndexBlockDetails, + String blockPath, String blockId) throws IOException { + String filePath = blockPath.substring(0, blockPath.lastIndexOf(File.separator) + 1) + blockId + + ".minmaxindex"; + BufferedWriter brWriter = null; + DataOutputStream dataOutStream = null; + try { + FileFactory.createNewFile(filePath, FileFactory.getFileType(filePath)); + dataOutStream = FileFactory.getDataOutputStream(filePath, FileFactory.getFileType(filePath)); + Gson gsonObjectToWrite = new Gson(); + brWriter = new BufferedWriter(new OutputStreamWriter(dataOutStream, + CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT)); + String minmaxIndexData = gsonObjectToWrite.toJson(minMaxIndexBlockDetails); + brWriter.write(minmaxIndexData); + } catch (IOException ioe) { + LOGGER.info("Error in writing minMaxindex file"); + } finally { + if (null != brWriter) { + brWriter.flush(); + } + if (null != dataOutStream) { + dataOutStream.flush(); + } + CarbonUtil.closeStreams(brWriter, dataOutStream); + } + } + +} \ No newline at end of file diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java new file mode 100644 index 00000000000..0596db56cea --- /dev/null +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.examples; + +import java.io.Serializable; + +public class MinMaxIndexBlockDetails implements Serializable { + private static final long serialVersionUID = 1206104914911491724L; + + /** + * Min value of a column of one blocklet Bit-Packed + */ + private byte[][] minValues; + + /** + * Max value of a columns of one blocklet Bit-Packed + */ + private byte[][] maxValues; + + /** + * filePath pointing to the block. + */ + private String filePath; + + /** + * BlockletID of the block. + */ + private Integer BlockletId; + + + public byte[][] getMinValues() { + return minValues; + } + + public void setMinValues(byte[][] minValues) { + this.minValues = minValues; + } + + public byte[][] getMaxValues() { + return maxValues; + } + + public void setMaxValues(byte[][] maxValues) { + this.maxValues = maxValues; + } + + public String getFilePath() { + return filePath; + } + + public void setFilePath(String filePath) { + this.filePath = filePath; + } + + public Integer getBlockletId() { + return BlockletId; + } + + public void setBlockletId(Integer blockletId) { + BlockletId = blockletId; + } +} \ No newline at end of file diff --git a/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala b/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala new file mode 100644 index 00000000000..0cfe410de1b --- /dev/null +++ b/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.datamap.examples + +import java.io.File + +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.util.CarbonProperties + +object MinMaxDataMapExample { + def main(args: Array[String]): Unit = { + + val rootPath = new File(this.getClass.getResource("/").getPath + + "").getCanonicalPath + val storeLocation = s"$rootPath/dataMap/examples/target/store" + val warehouse = s"$rootPath/datamap/examples/target/warehouse" + val metastoredb = s"$rootPath/datamap/examples/target" + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + + import org.apache.spark.sql.CarbonSession._ + + val spark = SparkSession + .builder() + .master("local") + .appName("CarbonDataMapExample") + .config("spark.sql.warehouse.dir", warehouse) + .getOrCreateCarbonSession(storeLocation) + + spark.sparkContext.setLogLevel("ERROR") + import spark.implicits._ + + // register datamap writer + DataMapStoreManager.getInstance().createAndRegisterDataMap( + AbsoluteTableIdentifier.from(storeLocation, "default", "carbonminmax"), + classOf[MinMaxDataMapFactory].getName, + MinMaxDataMap.NAME) + + spark.sql("DROP TABLE IF EXISTS carbonminmax") + + val df = spark.sparkContext.parallelize(1 to 33000) + .map(x => ("a", "b", x)) + .toDF("c1", "c2", "c3") + + // save dataframe to carbon file + df.write + .format("carbondata") + .option("tableName", "carbonminmax") + .mode(SaveMode.Overwrite) + .save() + + // Query the table. + spark.sql("select c2 from carbonminmax").show(20, false) + spark.sql("select c2 from carbonminmax where c2 = 'b'").show(20, false) + spark.sql("DROP TABLE IF EXISTS carbonminmax") + + } +} \ No newline at end of file diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala index ba6ad31dd92..8cb0ae29521 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala @@ -181,7 +181,7 @@ object DataMapWriterSuite { callbackSeq :+= s"blocklet start $blockletId" } - override def onBlockStart(blockId: String): Unit = { + override def onBlockStart(blockId: String, blockPath: String): Unit = { callbackSeq :+= s"block start $blockId" } diff --git a/pom.xml b/pom.xml index e7dd0e50018..c6add58d680 100644 --- a/pom.xml +++ b/pom.xml @@ -437,6 +437,7 @@ examples/spark integration/spark2 examples/spark2 + datamap/examples integration/hive integration/presto examples/flink @@ -540,6 +541,7 @@ integration/hive integration/presto examples/spark2 + datamap/examples diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java index 4b0113c4566..8e350d96102 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java @@ -82,10 +82,10 @@ private void register(DataMapFactory factory, String segmentId) { LOG.info("DataMapWriter " + writer + " added"); } - public void onBlockStart(String blockId) { + public void onBlockStart(String blockId, String blockPath) { for (List writers : registry.values()) { for (DataMapWriter writer : writers) { - writer.onBlockStart(blockId); + writer.onBlockStart(blockId, blockPath); } } } diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index 972e4143b2b..cebfb13fdba 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -269,7 +269,7 @@ protected void createNewFileIfReachThreshold(long blockletSizeToBeAdded) private void notifyDataMapBlockStart() { if (listener != null) { - listener.onBlockStart(carbonDataFileName); + listener.onBlockStart(carbonDataFileName, constructFactFileFullPath()); } } @@ -280,6 +280,11 @@ private void notifyDataMapBlockEnd() { blockletId = 0; } + private String constructFactFileFullPath() { + String factFilePath = + this.dataWriterVo.getCarbonDataDirectoryPath() + File.separator + this.carbonDataFileName; + return factFilePath; + } /** * Finish writing current file. It will flush stream, copy and rename temp file to final file * @param copyInCurrentThread set to false if want to do data copy in a new thread diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java index d8ae8ffe601..c366f63fe8a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java @@ -43,6 +43,7 @@ import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter; import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo; + /** * Below class will be used to write the data in V3 format * > @@ -157,6 +158,8 @@ private void addPageData(TablePage tablePage) { } } + + /** * Write the collect blocklet data (blockletDataHolder) to file */