From c01d4cd6dfa172564eff2cf53cadab5cb3d2b46a Mon Sep 17 00:00:00 2001 From: sounakr Date: Wed, 6 Sep 2017 10:15:25 +0530 Subject: [PATCH] Min Max DataMap 1st Phase --- .../carbondata/core/cache/CacheType.java | 3 + .../blockletindex/DataMapMinMax.java | 454 ++++++++++++++++++ .../datamap/DataMapMinMaxFactory.java | 122 +++++ .../datamap/DataMapMinMaxWriter.scala | 163 +++++++ .../datamap/DataMapWriterSuite.scala | 7 + .../testsuite/datamap/MinMaxDataMap.java | 193 ++++++++ 6 files changed, 942 insertions(+) create mode 100644 core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/DataMapMinMax.java create mode 100644 integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapMinMaxFactory.java create mode 100644 integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapMinMaxWriter.scala create mode 100644 integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/MinMaxDataMap.java diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java index ab51ff20e59..35e5d3d3a67 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java @@ -61,6 +61,9 @@ public class CacheType { public static final CacheType DRIVER_BLOCKLET_DATAMAP = new CacheType("driver_blocklet_datamap"); + public static final CacheType + DRIVER_BLOCKLET_MINMAX_DATAMAP = new CacheType("driver_blocklet_minmax_datamap"); + /** * cacheName which is unique name for a cache */ diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/DataMapMinMax.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/DataMapMinMax.java new file mode 100644 index 00000000000..b050a455e9e --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/DataMapMinMax.java @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.blockletindex; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.Comparator; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.Cacheable; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.dev.DataMap; +import org.apache.carbondata.core.datastore.IndexKey; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.indexstore.Blocklet; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; +import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore; +import org.apache.carbondata.core.indexstore.row.DataMapRow; +import org.apache.carbondata.core.indexstore.row.DataMapRowImpl; +import org.apache.carbondata.core.indexstore.schema.DataMapSchema; +import org.apache.carbondata.core.keygenerator.KeyGenException; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataFileFooterConverter; + +/** + * Datamap implementation for blocklet. + */ +public class DataMapMinMax implements DataMap, Cacheable { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(DataMapMinMax.class.getName()); + + public static final String NAME = "clustered.btree.blocklet"; + + private static int KEY_INDEX = 0; + + private static int MIN_VALUES_INDEX = 1; + + private static int MAX_VALUES_INDEX = 2; + + private static int ROW_COUNT_INDEX = 3; + + private static int FILE_PATH_INDEX = 4; + + private static int PAGE_COUNT_INDEX = 5; + + private static int VERSION_INDEX = 6; + + private static int SCHEMA_UPADATED_TIME_INDEX = 7; + + private static int BLOCK_INFO_INDEX = 8; + + private UnsafeMemoryDMStore unsafeMemoryDMStore; + + private SegmentProperties segmentProperties; + + private int[] columnCardinality; + + @Override + public void init(String filePath) throws IOException, MemoryException { + long startTime = System.currentTimeMillis(); + DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); + List indexInfo = fileFooterConverter.getIndexInfo(filePath); + for (DataFileFooter fileFooter : indexInfo) { + List columnInTable = fileFooter.getColumnInTable(); + if (segmentProperties == null) { + columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality(); + segmentProperties = new SegmentProperties(columnInTable, columnCardinality); + createSchema(segmentProperties); + } + TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo(); + if (fileFooter.getBlockletList() == null || fileFooter.getBlockletList().size() == 0) { + LOGGER + .info("Reading carbondata file footer to get blocklet info " + blockInfo.getFilePath()); + fileFooter = CarbonUtil.readMetadatFile(blockInfo); + } + + loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath()); + } + if (unsafeMemoryDMStore != null) { + unsafeMemoryDMStore.finishWriting(); + } + LOGGER.info("Time taken to load blocklet datamap from file : " + filePath + "is " + + (System.currentTimeMillis() - startTime)); + } + + + // THis routine is suppose to write the file into the disk. + private void loadToUnsafe(DataFileFooter fileFooter, SegmentProperties segmentProperties, + String filePath) { + int[] minMaxLen = segmentProperties.getColumnsValueSize(); + List blockletList = fileFooter.getBlockletList(); + DataMapSchema[] schema = unsafeMemoryDMStore.getSchema(); + for (int index = 0; index < blockletList.size(); index++) { + DataMapRow row = new DataMapRowImpl(schema); + int ordinal = 0; + BlockletInfo blockletInfo = blockletList.get(index); + + // add start key as index key + row.setByteArray(blockletInfo.getBlockletIndex().getBtreeIndex().getStartKey(), ordinal++); + + BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex(); + row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMinValues()), ordinal); + ordinal++; + row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMaxValues()), ordinal); + ordinal++; + + row.setInt(blockletInfo.getNumberOfRows(), ordinal++); + + // add file path + byte[] filePathBytes = + filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS); + row.setByteArray(filePathBytes, ordinal++); + + // add pages + row.setShort((short) blockletInfo.getNumberOfPages(), ordinal++); + + // add version number + row.setShort(fileFooter.getVersionId().number(), ordinal++); + + // add schema updated time + row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++); + + // add blocklet info + byte[] serializedData; + try { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + DataOutput dataOutput = new DataOutputStream(stream); + blockletInfo.write(dataOutput); + serializedData = stream.toByteArray(); + row.setByteArray(serializedData, ordinal); + unsafeMemoryDMStore.addIndexRowToUnsafe(row); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + private DataMapRow addMinMax(int[] minMaxLen, DataMapSchema dataMapSchema, byte[][] minValues) { + DataMapSchema[] minSchemas = + ((DataMapSchema.StructDataMapSchema) dataMapSchema).getChildSchemas(); + DataMapRow minRow = new DataMapRowImpl(minSchemas); + int minOrdinal = 0; + // min value adding + for (int i = 0; i < minMaxLen.length; i++) { + minRow.setByteArray(minValues[i], minOrdinal++); + } + return minRow; + } + + private void createSchema(SegmentProperties segmentProperties) throws MemoryException { + List indexSchemas = new ArrayList<>(); + + // Index key + indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY)); + int[] minMaxLen = segmentProperties.getColumnsValueSize(); + // do it 2 times, one for min and one for max. + for (int k = 0; k < 2; k++) { + DataMapSchema[] mapSchemas = new DataMapSchema[minMaxLen.length]; + for (int i = 0; i < minMaxLen.length; i++) { + if (minMaxLen[i] <= 0) { + mapSchemas[i] = new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY); + } else { + mapSchemas[i] = new DataMapSchema.FixedDataMapSchema(DataType.BYTE_ARRAY, minMaxLen[i]); + } + } + DataMapSchema mapSchema = new DataMapSchema.StructDataMapSchema(DataType.STRUCT, mapSchemas); + indexSchemas.add(mapSchema); + } + + // for number of rows. + indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.INT)); + + // for table block path + indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY)); + + // for number of pages. + indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.SHORT)); + + // for version number. + indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.SHORT)); + + // for schema updated time. + indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.LONG)); + + //for blocklet info + indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY)); + + unsafeMemoryDMStore = + new UnsafeMemoryDMStore(indexSchemas.toArray(new DataMapSchema[indexSchemas.size()])); + } + + @Override + public List prune(FilterResolverIntf filterExp) { + + // getting the start and end index key based on filter for hitting the + // selected block reference nodes based on filter resolver tree. + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("preparing the start and end key for finding" + + "start and end block as per filter resolver"); + } + List blocklets = new ArrayList<>(); + Comparator comparator = + new BlockletDMComparator(segmentProperties.getColumnsValueSize(), + segmentProperties.getNumberOfSortColumns(), + segmentProperties.getNumberOfNoDictSortColumns()); + List listOfStartEndKeys = new ArrayList(2); + FilterUtil + .traverseResolverTreeAndGetStartAndEndKey(segmentProperties, filterExp, listOfStartEndKeys); + // reading the first value from list which has start key + IndexKey searchStartKey = listOfStartEndKeys.get(0); + // reading the last value from list which has end key + IndexKey searchEndKey = listOfStartEndKeys.get(1); + if (null == searchStartKey && null == searchEndKey) { + try { + // TODO need to handle for no dictionary dimensions + searchStartKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties); + // TODO need to handle for no dictionary dimensions + searchEndKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties); + } catch (KeyGenException e) { + return null; + } + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Successfully retrieved the start and end key" + "Dictionary Start Key: " + Arrays + .toString(searchStartKey.getDictionaryKeys()) + "No Dictionary Start Key " + Arrays + .toString(searchStartKey.getNoDictionaryKeys()) + "Dictionary End Key: " + Arrays + .toString(searchEndKey.getDictionaryKeys()) + "No Dictionary End Key " + Arrays + .toString(searchEndKey.getNoDictionaryKeys())); + } + if (filterExp == null) { + int rowCount = unsafeMemoryDMStore.getRowCount(); + for (int i = 0; i < rowCount; i++) { + DataMapRow unsafeRow = unsafeMemoryDMStore.getUnsafeRow(i); + blocklets.add(createBlocklet(unsafeRow, i)); + } + } else { + int startIndex = findStartIndex(convertToRow(searchStartKey), comparator); + int endIndex = findEndIndex(convertToRow(searchEndKey), comparator); + FilterExecuter filterExecuter = + FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null); + while (startIndex <= endIndex) { + DataMapRow unsafeRow = unsafeMemoryDMStore.getUnsafeRow(startIndex); + BitSet bitSet = filterExecuter.isScanRequired(getMinMaxValue(unsafeRow, MAX_VALUES_INDEX), + getMinMaxValue(unsafeRow, MIN_VALUES_INDEX)); + if (!bitSet.isEmpty()) { + blocklets.add(createBlocklet(unsafeRow, startIndex)); + } + startIndex++; + } + } + + return blocklets; + } + + private byte[][] getMinMaxValue(DataMapRow row, int index) { + DataMapRow minMaxRow = row.getRow(index); + byte[][] minMax = new byte[minMaxRow.getColumnCount()][]; + for (int i = 0; i < minMax.length; i++) { + minMax[i] = minMaxRow.getByteArray(i); + } + return minMax; + } + + private Blocklet createBlocklet(DataMapRow row, int blockletId) { + Blocklet blocklet = new Blocklet( + new String(row.getByteArray(FILE_PATH_INDEX), CarbonCommonConstants.DEFAULT_CHARSET_CLASS), + blockletId + ""); + BlockletDetailInfo detailInfo = new BlockletDetailInfo(); + detailInfo.setRowCount(row.getInt(ROW_COUNT_INDEX)); + detailInfo.setPagesCount(row.getShort(PAGE_COUNT_INDEX)); + detailInfo.setVersionNumber(row.getShort(VERSION_INDEX)); + detailInfo.setDimLens(columnCardinality); + detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX)); + BlockletInfo blockletInfo = new BlockletInfo(); + try { + byte[] byteArray = row.getByteArray(BLOCK_INFO_INDEX); + ByteArrayInputStream stream = new ByteArrayInputStream(byteArray); + DataInputStream inputStream = new DataInputStream(stream); + blockletInfo.readFields(inputStream); + inputStream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + detailInfo.setBlockletInfo(blockletInfo); + blocklet.setDetailInfo(detailInfo); + return blocklet; + } + + /** + * Binary search used to get the first tentative index row based on + * search key + * + * @param key search key + * @return first tentative block + */ + private int findStartIndex(DataMapRow key, Comparator comparator) { + int childNodeIndex; + int low = 0; + int high = unsafeMemoryDMStore.getRowCount() - 1; + int mid = 0; + int compareRes = -1; + // + while (low <= high) { + mid = (low + high) >>> 1; + // compare the entries + compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid)); + if (compareRes < 0) { + high = mid - 1; + } else if (compareRes > 0) { + low = mid + 1; + } else { + // if key is matched then get the first entry + int currentPos = mid; + while (currentPos - 1 >= 0 + && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos - 1)) == 0) { + currentPos--; + } + mid = currentPos; + break; + } + } + // if compare result is less than zero then we + // and mid is more than 0 then we need to previous block as duplicates + // record can be present + if (compareRes < 0) { + if (mid > 0) { + mid--; + } + childNodeIndex = mid; + } else { + childNodeIndex = mid; + } + // get the leaf child + return childNodeIndex; + } + + /** + * Binary search used to get the last tentative block based on + * search key + * + * @param key search key + * @return first tentative block + */ + private int findEndIndex(DataMapRow key, Comparator comparator) { + int childNodeIndex; + int low = 0; + int high = unsafeMemoryDMStore.getRowCount() - 1; + int mid = 0; + int compareRes = -1; + // + while (low <= high) { + mid = (low + high) >>> 1; + // compare the entries + compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid)); + if (compareRes < 0) { + high = mid - 1; + } else if (compareRes > 0) { + low = mid + 1; + } else { + int currentPos = mid; + // if key is matched then get the first entry + while (currentPos + 1 < unsafeMemoryDMStore.getRowCount() + && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos + 1)) == 0) { + currentPos++; + } + mid = currentPos; + break; + } + } + // if compare result is less than zero then we + // and mid is more than 0 then we need to previous block as duplicates + // record can be present + if (compareRes < 0) { + if (mid > 0) { + mid--; + } + childNodeIndex = mid; + } else { + childNodeIndex = mid; + } + return childNodeIndex; + } + + private DataMapRow convertToRow(IndexKey key) { + ByteBuffer buffer = + ByteBuffer.allocate(key.getDictionaryKeys().length + key.getNoDictionaryKeys().length + 8); + buffer.putInt(key.getDictionaryKeys().length); + buffer.putInt(key.getNoDictionaryKeys().length); + buffer.put(key.getDictionaryKeys()); + buffer.put(key.getNoDictionaryKeys()); + DataMapRowImpl dataMapRow = new DataMapRowImpl(unsafeMemoryDMStore.getSchema()); + dataMapRow.setByteArray(buffer.array(), 0); + return dataMapRow; + } + + @Override + public void clear() { + unsafeMemoryDMStore.freeMemory(); + unsafeMemoryDMStore = null; + segmentProperties = null; + } + + @Override + public long getFileTimeStamp() { + return 0; + } + + @Override + public int getAccessCount() { + return 0; + } + + @Override + public long getMemorySize() { + return unsafeMemoryDMStore.getMemoryUsed(); + } + +} diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapMinMaxFactory.java b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapMinMaxFactory.java new file mode 100644 index 00000000000..1134f9e9b68 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapMinMaxFactory.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.datamap; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.datamap.DataMapMeta; +import org.apache.carbondata.core.datamap.dev.DataMap; +import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.datamap.dev.DataMapFactory; +import org.apache.carbondata.core.datamap.dev.DataMapWriter; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.events.ChangeEvent; +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; +import org.apache.carbondata.core.indexstore.schema.FilterType; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; + +public class DataMapMinMaxFactory implements DataMapFactory { + + private AbsoluteTableIdentifier identifier; + + // segmentId -> list of index file + private Map> segmentMap = new HashMap<>(); + + private Cache cache; + + @Override public void init(AbsoluteTableIdentifier identifier, String dataMapName) { + this.identifier = identifier; + cache = CacheProvider.getInstance() + .createCache(CacheType.DRIVER_BLOCKLET_MINMAX_DATAMAP, identifier.getStorePath()); + } + + @Override public DataMapWriter createWriter(String segmentId) { + return DataMapMinMaxWriter.dataMapMinMaxWriter(); + } + + @Override public List getDataMaps(String segmentId) throws IOException { + List tableBlockIndexUniqueIdentifiers = + segmentMap.get(segmentId); + if (tableBlockIndexUniqueIdentifiers == null) { + tableBlockIndexUniqueIdentifiers = new ArrayList<>(); + CarbonFile[] listFiles = getCarbonMinMaxIndexFiles(segmentId); + for (int i = 0; i < listFiles.length; i++) { + tableBlockIndexUniqueIdentifiers.add( + new TableBlockIndexUniqueIdentifier(identifier, segmentId, listFiles[i].getName())); + } + } + return cache.getAll(tableBlockIndexUniqueIdentifiers); + } + + private CarbonFile[] getCarbonMinMaxIndexFiles(String segmentId) { + String path = identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId; + CarbonFile carbonFile = FileFactory.getCarbonFile(path); + return carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().endsWith(".carbonindexminmax"); + } + }); + } + + @Override public DataMap getDataMap(DataMapDistributable distributable) { + return null; + } + + @Override public List toDistributable(String segmentId) { + return null; + } + + @Override public void fireEvent(ChangeEvent event) { + + } + + @Override public void clear(String segmentId) { + List blockIndexes = segmentMap.remove(segmentId); + if (blockIndexes != null) { + for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) { + DataMap dataMap = cache.getIfPresent(blockIndex); + dataMap.clear(); + cache.invalidate(blockIndex); + } + } + } + + @Override public void clear() { + for (String segmentId: segmentMap.keySet()) { + clear(segmentId); + } + } + + @Override public DataMapMeta getMeta() { + return new DataMapMeta(Arrays.asList("c2"), FilterType.EQUALTO); + } +} + + + + diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapMinMaxWriter.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapMinMaxWriter.scala new file mode 100644 index 00000000000..9104437c675 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapMinMaxWriter.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.datamap + +import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.sql.{DataFrame, SaveMode} +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap} +import org.apache.carbondata.core.datamap.dev.DataMapWriter +import org.apache.carbondata.core.datastore.page.ColumnPage +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.datatype.DataType +import org.apache.carbondata.core.util.CarbonProperties + + +class DataMapMinMaxWriter extends QueryTest with BeforeAndAfterAll { + + def buildTestData(numRows: Int): DataFrame = { + import sqlContext.implicits._ + sqlContext.sparkContext.parallelize(1 to numRows) + .map(x => ("a", "b", x)) + .toDF("c1", "c2", "c3") + } + + def dropTable(): Unit = { + sql("DROP TABLE IF EXISTS carbon1") + sql("DROP TABLE IF EXISTS carbon2") + } + + override def beforeAll { + dropTable() + } + + test("test write datamap 2 pages") { + // register datamap writer + TableDataMap dataMapMinMax = DataMapStoreManager.getInstance().createAndRegisterDataMap( + AbsoluteTableIdentifier.from(storeLocation, "default", "carbonMinMax"), + classOf[DataMapMinMaxClass].getName, + "test") + + val df = buildTestData(33000) + + // save dataframe to carbon file + df.write + .format("carbondata") + .option("tableName", "carbonMinMax") + .mode(SaveMode.Overwrite) + .save() + + + assert(DataMapWriterSuite.callbackSeq.head.contains("block start")) + assert(DataMapWriterSuite.callbackSeq.last.contains("block end")) + assert(DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq( + "blocklet start 0", + "add page data: blocklet 0, page 0", + "add page data: blocklet 0, page 1", + "blocklet end: 0" + )) + DataMapWriterSuite.callbackSeq = Seq() + } + + test("test write datamap 2 blocklet") { + // register datamap writer + DataMapStoreManager.getInstance().createAndRegisterDataMap( + AbsoluteTableIdentifier.from(storeLocation, "default", "carbon2"), + classOf[C2DataMapFactory].getName, + "test") + + CarbonProperties.getInstance() + .addProperty("carbon.blockletgroup.size.in.mb", "1") + + val df = buildTestData(300000) + + // save dataframe to carbon file + df.write + .format("carbondata") + .option("tableName", "carbon2") + .mode(SaveMode.Overwrite) + .save() + + assert(DataMapWriterSuite.callbackSeq.head.contains("block start")) + assert(DataMapWriterSuite.callbackSeq.last.contains("block end")) + assert(DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq( + "blocklet start 0", + "add page data: blocklet 0, page 0", + "add page data: blocklet 0, page 1", + "add page data: blocklet 0, page 2", + "add page data: blocklet 0, page 3", + "add page data: blocklet 0, page 4", + "add page data: blocklet 0, page 5", + "add page data: blocklet 0, page 6", + "add page data: blocklet 0, page 7", + "blocklet end: 0", + "blocklet start 1", + "add page data: blocklet 1, page 0", + "add page data: blocklet 1, page 1", + "blocklet end: 1" + )) + DataMapWriterSuite.callbackSeq = Seq() + } + + override def afterAll { + dropTable() + } +} + +object DataMapMinMaxWriter { + + val dataMapMinMaxWriter = new DataMapWriter { + + var callbackSeq: Seq[String] = Seq[String]() + + override def onPageAdded( + blockletId: Int, + pageId: Int, + pages: Array[ColumnPage]): Unit = { + assert(pages.length == 1) + assert(pages(0).getDataType == DataType.STRING) + val bytes: Array[Byte] = pages(0).getByteArrayPage()(0) + assert(bytes.sameElements(Seq(0, 1, 'b'.toByte))) + callbackSeq :+= s"add page data: blocklet $blockletId, page $pageId" + } + + def writeMinMax(filePath: String):Unit = { + updateMinMaxIndex(filePath) + } + + override def onBlockletEnd(blockletId: Int): Unit = { + callbackSeq :+= s"blocklet end: $blockletId" + writeMinMax(filePath); + } + + override def onBlockEnd(blockId: String): Unit = { + callbackSeq :+= s"block end $blockId" + } + + override def onBlockletStart(blockletId: Int): Unit = { + callbackSeq :+= s"blocklet start $blockletId" + } + + override def onBlockStart(blockId: String): Unit = { + callbackSeq :+= s"block start $blockId" + } + + } +} + diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala index e8ec6ad751d..594186cd22c 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala @@ -115,6 +115,7 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll { classOf[C2DataMapFactory].getName, "test") + CarbonProperties.getInstance() .addProperty("carbon.blockletgroup.size.in.mb", "1") @@ -166,7 +167,10 @@ object DataMapWriterSuite { assert(pages(0).getDataType == DataType.STRING) val bytes: Array[Byte] = pages(0).getByteArrayPage()(0) assert(bytes.sameElements(Seq(0, 1, 'b'.toByte))) + + /* callbackSeq :+= s"add page data: blocklet $blockletId, page $pageId" + */ } override def onBlockletEnd(blockletId: Int): Unit = { @@ -175,6 +179,9 @@ object DataMapWriterSuite { override def onBlockEnd(blockId: String): Unit = { callbackSeq :+= s"block end $blockId" + // Calculate the Min and Max values of the block and store it into an index file. + + } override def onBlockletStart(blockletId: Int): Unit = { diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/MinMaxDataMap.java b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/MinMaxDataMap.java new file mode 100644 index 00000000000..481b57cb6d6 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/MinMaxDataMap.java @@ -0,0 +1,193 @@ +package org.apache.carbondata.spark.testsuite.datamap; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.dev.DataMap; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.indexstore.Blocklet; +import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap; +import org.apache.carbondata.core.indexstore.row.DataMapRow; +import org.apache.carbondata.core.indexstore.row.DataMapRowImpl; +import org.apache.carbondata.core.indexstore.schema.DataMapSchema; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataFileFooterConverter; + +public class MinMaxDataMap implements DataMap { + + public static final String NAME = "clustered.btree.minmax"; + + private static int KEY_INDEX = 0; + + private static int MIN_VALUES_INDEX = 1; + + private static int MAX_VALUES_INDEX = 2; + + private UnsafeMemoryDMStore unsafeMemoryDMStore; + + private SegmentProperties segmentProperties; + + private int[] columnCardinality; + + + @Override public void init(String filePath) throws MemoryException, IOException { + long startTime = System.currentTimeMillis(); + DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); + List indexInfo = fileFooterConverter.getIndexInfo(filePath); + for (DataFileFooter fileFooter : indexInfo) { + List columnInTable = fileFooter.getColumnInTable(); + if (segmentProperties == null) { + columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality(); + segmentProperties = new SegmentProperties(columnInTable, columnCardinality); + //createSchema(segmentProperties); + } + TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo(); + if (fileFooter.getBlockletList() == null || fileFooter.getBlockletList().size() == 0) { +// LOGGER +// .info("Reading carbondata file footer to get blocklet info " + blockInfo.getFilePath()); + fileFooter = CarbonUtil.readMetadatFile(blockInfo); + } + + loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath()); + } + if (unsafeMemoryDMStore != null) { + unsafeMemoryDMStore.finishWriting(); + } +// LOGGER.info("Time taken to load blocklet datamap from file : " + filePath + "is " + +// (System.currentTimeMillis() - startTime)); + + } + + @Override public List prune(FilterResolverIntf filterExp) { + return null; + } + + @Override public void clear() { + + } + + public void updateMinMaxIndex(String filePath) throws IOException, MemoryException { + long startTime = System.currentTimeMillis(); + DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); + List indexInfo = fileFooterConverter.getIndexInfo(filePath); + for (DataFileFooter fileFooter : indexInfo) { + List columnInTable = fileFooter.getColumnInTable(); + if (segmentProperties == null) { + columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality(); + segmentProperties = new SegmentProperties(columnInTable, columnCardinality); + createSchema(segmentProperties); + } + TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo(); + if (fileFooter.getBlockletList() == null || fileFooter.getBlockletList().size() == 0) { +// LOGGER +// .info("Reading carbondata file footer to get blocklet info " + blockInfo.getFilePath()); + fileFooter = CarbonUtil.readMetadatFile(blockInfo); + } + + loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath()); + } + if (unsafeMemoryDMStore != null) { + unsafeMemoryDMStore.finishWriting(); + } +// LOGGER.info("Time taken to load blocklet datamap from file : " + filePath + "is " + +// (System.currentTimeMillis() - startTime)); + + } + + private void createSchema(SegmentProperties segmentProperties) throws MemoryException { + + // 1.Schema Contains + // a. Min + // b. Max + // c. Block path + // d. Blocklet info. + List indexSchemas = new ArrayList<>(); + + int[] minMaxLen = segmentProperties.getColumnsValueSize(); + // do it 2 times, one for min and one for max. + for (int k = 0; k < 2; k++) { + DataMapSchema[] mapSchemas = new DataMapSchema[minMaxLen.length]; + for (int i = 0; i < minMaxLen.length; i++) { + if (minMaxLen[i] <= 0) { + mapSchemas[i] = new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY); + } else { + mapSchemas[i] = new DataMapSchema.FixedDataMapSchema(DataType.BYTE_ARRAY, minMaxLen[i]); + } + } + DataMapSchema mapSchema = new DataMapSchema.StructDataMapSchema(DataType.STRUCT, mapSchemas); + indexSchemas.add(mapSchema); + } + // for table block path + indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY)); + + //for blocklet info + indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY)); + + unsafeMemoryDMStore = + new UnsafeMemoryDMStore(indexSchemas.toArray(new DataMapSchema[indexSchemas.size()])); + } + + private void loadToUnsafe(DataFileFooter fileFooter, SegmentProperties segmentProperties, + String filePath) { + int[] minMaxLen = segmentProperties.getColumnsValueSize(); + List blockletList = fileFooter.getBlockletList(); + DataMapSchema[] schema = unsafeMemoryDMStore.getSchema(); + for (int index = 0; index < blockletList.size(); index++) { + DataMapRow row = new DataMapRowImpl(schema); + int ordinal = 0; + BlockletInfo blockletInfo = blockletList.get(index); + + BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex(); + row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMinValues()), ordinal); + ordinal++; + row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMaxValues()), ordinal); + ordinal++; + + // add file path + byte[] filePathBytes = + filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS); + row.setByteArray(filePathBytes, ordinal++); + + // add blocklet info + byte[] serializedData; + try { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + DataOutput dataOutput = new DataOutputStream(stream); + blockletInfo.write(dataOutput); + serializedData = stream.toByteArray(); + row.setByteArray(serializedData, ordinal); + unsafeMemoryDMStore.addIndexRowToUnsafe(row); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + private DataMapRow addMinMax(int[] minMaxLen, DataMapSchema dataMapSchema, byte[][] minValues) { + DataMapSchema[] minSchemas = + ((DataMapSchema.StructDataMapSchema) dataMapSchema).getChildSchemas(); + DataMapRow minRow = new DataMapRowImpl(minSchemas); + int minOrdinal = 0; + // min value adding + for (int i = 0; i < minMaxLen.length; i++) { + minRow.setByteArray(minValues[i], minOrdinal++); + } + return minRow; + } + +}