From 6b3bc0127ec010f77a3a84fa16944d9176060182 Mon Sep 17 00:00:00 2001 From: kunal642 Date: Tue, 5 Feb 2019 16:44:44 +0530 Subject: [PATCH] index cache --- bin/start-indexserver.sh | 56 ++++ bin/stop-indexserver.sh | 26 ++ .../carbondata/core/cache/CarbonLRUCache.java | 8 + .../core/constants/CarbonCommonConstants.java | 34 +++ .../core/datamap/DataMapChooser.java | 2 +- .../carbondata/core/datamap/DataMapJob.java | 4 +- .../core/datamap/DataMapStoreManager.java | 124 ++++++--- .../carbondata/core/datamap/DataMapUtil.java | 175 +++++++++--- .../datamap/DistributableDataMapFormat.java | 254 +++++++++++++++--- .../carbondata/core/datamap/Segment.java | 79 +++++- .../carbondata/core/datamap/TableDataMap.java | 47 +--- .../core/datamap/dev/DataMapFactory.java | 13 +- .../carbondata/core/indexstore/Blocklet.java | 24 +- .../indexstore/BlockletDetailsFetcher.java | 2 + .../core/indexstore/ExtendedBlocklet.java | 53 ++++ .../core/indexstore/PartitionSpec.java | 52 +++- ...ableBlockIndexUniqueIdentifierWrapper.java | 7 +- .../BlockletDataMapDistributable.java | 14 + .../blockletindex/BlockletDataMapFactory.java | 140 ++++++---- .../core/metadata/SegmentFileStore.java | 5 +- .../table/AggregationDataMapSchema.java | 5 +- .../metadata/schema/table/CarbonTable.java | 16 +- .../core/util/BlockletDataMapUtil.java | 2 +- .../core/util/CarbonProperties.java | 52 ++++ .../carbondata/core/util/SessionParams.java | 1 + .../carbondata/hadoop/CarbonInputSplit.java | 4 + .../bloom/BloomCoarseGrainDataMapFactory.java | 18 +- .../lucene/LuceneDataMapFactoryBase.java | 17 +- dev/findbugs-exclude.xml | 8 + .../hadoop/api/CarbonFileInputFormat.java | 4 +- .../hadoop/api/CarbonInputFormat.java | 212 +++++++++------ .../hadoop/api/CarbonTableInputFormat.java | 136 +++++----- .../hadoop/util/CarbonInputFormatUtil.java | 2 +- ...hColumnMetCacheAndCacheLevelProperty.scala | 4 +- .../testsuite/datamap/CGDataMapTestCase.scala | 8 - .../datamap/DataMapWriterSuite.scala | 2 - .../testsuite/datamap/FGDataMapTestCase.scala | 8 +- .../testsuite/datamap/TestDataMapStatus.scala | 2 - .../TestInsertAndOtherCommandConcurrent.scala | 2 - .../spark/util/CarbonScalaUtil.scala | 11 +- .../spark/sql/hive/DistributionUtil.scala | 2 +- .../datamap/IndexDataMapProvider.java | 2 +- .../carbondata/indexserver/DataMapJobs.scala | 80 ++++++ .../indexserver/DistributedPruneRDD.scala} | 88 +++--- .../indexserver/DistributedRDDUtils.scala | 164 +++++++++++ .../indexserver/DistributedShowCacheRDD.scala | 58 ++++ .../carbondata/indexserver/IndexServer.scala | 178 ++++++++++++ .../InvalidateSegmentCacheRDD.scala | 54 ++++ .../spark/rdd/CarbonDataRDDFactory.scala | 7 + .../apache/spark/sql/CarbonCountStar.scala | 1 - .../events/MergeBloomIndexEventListener.scala | 5 - .../execution/command/cache/CacheUtil.scala | 15 +- .../cache/CarbonDropCacheCommand.scala | 21 +- .../cache/CarbonShowCacheCommand.scala | 56 +++- .../CarbonProjectForDeleteCommand.scala | 5 +- .../CarbonProjectForUpdateCommand.scala | 4 + .../command/mutation/DeleteExecution.scala | 20 +- .../command/CarbonHiveCommands.scala | 5 + 58 files changed, 1917 insertions(+), 481 deletions(-) create mode 100755 bin/start-indexserver.sh create mode 100755 bin/stop-indexserver.sh create mode 100644 integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala rename integration/{spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala => spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala} (51%) create mode 100644 integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala create mode 100644 integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedShowCacheRDD.scala create mode 100644 integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala create mode 100644 integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala diff --git a/bin/start-indexserver.sh b/bin/start-indexserver.sh new file mode 100755 index 00000000000..e34516c53f7 --- /dev/null +++ b/bin/start-indexserver.sh @@ -0,0 +1,56 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# Shell script for starting the Distributed Index Server + +# Enter posix mode for bash +set -o posix + +if [ -z "${SPARK_HOME}" ]; then + export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" +fi + +# NOTE: This exact class name is matched downstream by SparkSubmit. +# Any changes need to be reflected there. +CLASS="org.apache.carbondata.indexserver.IndexServer" + +function usage { + echo "Usage: ./sbin/start-indexserver [options] [index server options]" + pattern="usage" + pattern+="\|Spark assembly has been built with Hive" + pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set" + pattern+="\|Spark Command: " + pattern+="\|=======" + pattern+="\|--help" + + "${SPARK_HOME}"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 + echo + echo "Thrift server options:" + "${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2 +} + +if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then + usage + exit 0 +fi + +export SUBMIT_USAGE_FUNCTION=usage + +exec "${SPARK_HOME}"/sbin/spark-daemon.sh submit $CLASS 1 --name "DistributedIndexServer" "$@" diff --git a/bin/stop-indexserver.sh b/bin/stop-indexserver.sh new file mode 100755 index 00000000000..e024090c332 --- /dev/null +++ b/bin/stop-indexserver.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Stops the Distributed Index Server on the machine this script is executed on. + +if [ -z "${SPARK_HOME}" ]; then + export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" +fi + +"${SPARK_HOME}/sbin"/spark-daemon.sh stop org.apache.carbondata.indexserver.IndexServer 1 diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java index 3371d0d90ab..2e2e3686f16 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java @@ -217,6 +217,7 @@ public boolean put(String columnIdentifier, Cacheable cacheInfo, long requiredSi } else { synchronized (lruCacheMap) { addEntryToLRUCacheMap(columnIdentifier, cacheInfo); + currentSize = currentSize + requiredSize; } columnKeyAddedSuccessfully = true; } @@ -358,4 +359,11 @@ private double getPartOfXmx() { long mSizeMB = Runtime.getRuntime().maxMemory() / BYTE_CONVERSION_CONSTANT; return mSizeMB * CarbonCommonConstants.CARBON_LRU_CACHE_PERCENT_OVER_MAX_SIZE; } + + /** + * @return current size of the cache in memory. + */ + public long getCurrentSize() { + return currentSize; + } } diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 608b5fbc0c5..43c56a6bca7 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -2124,4 +2124,38 @@ private CarbonCommonConstants() { */ public static final String CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE_DEFAULT_VAL = "512"; + /** + * The IP on which Index Server will be started. + */ + @CarbonProperty + public static final String CARBON_INDEX_SERVER_IP = "carbon.index.server.ip"; + + /** + * The Port to be used to start Index Server. + */ + @CarbonProperty + public static final String CARBON_INDEX_SERVER_PORT = "carbon.index.server.port"; + + /** + * Whether to use index server for caching and pruning or not. + * This property can be used for + * 1. the whole application(carbon.properties). + * 2. the whole session(set carbon.enable.index.server) + * 3. a specific table for one session (set carbon.enable.index.server..) + */ + @CarbonProperty(dynamicConfigurable = true) + public static final String CARBON_ENABLE_INDEX_SERVER = "carbon.enable.index.server"; + + /** + * Property is used to enable/disable fallback for indexserver. + * Used for testing purposes only. + */ + public static final String CARBON_DISABLE_INDEX_SERVER_FALLBACK = + "carbon.disable.index.server.fallback"; + + public static final String CARBON_INDEX_SERVER_WORKER_THREADS = + "carbon.index.server.max.worker.threads"; + + public static final int CARBON_INDEX_SERVER_WORKER_THREADS_DEFAULT = + 500; } diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java index 3b6537c95c1..239401e944f 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java @@ -120,7 +120,7 @@ public DataMapExprWrapper chooseCGDataMap(FilterResolverIntf resolverIntf) { return chooseDataMap(DataMapLevel.CG, resolverIntf); } - private DataMapExprWrapper chooseDataMap(DataMapLevel level, FilterResolverIntf resolverIntf) { + DataMapExprWrapper chooseDataMap(DataMapLevel level, FilterResolverIntf resolverIntf) { if (resolverIntf != null) { Expression expression = resolverIntf.getFilterExpression(); List datamaps = level == DataMapLevel.CG ? cgDataMaps : fgDataMaps; diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java index 57a739d50ba..9eafe7c635d 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java @@ -22,7 +22,6 @@ import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; @@ -34,7 +33,6 @@ public interface DataMapJob extends Serializable { void execute(CarbonTable carbonTable, FileInputFormat format); - List execute(DistributableDataMapFormat dataMapFormat, - FilterResolverIntf filter); + List execute(DistributableDataMapFormat dataMapFormat); } diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java index a797b1113e5..81b1fb2e8e2 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java @@ -454,10 +454,11 @@ private TableDataMap getTableDataMap(String dataMapName, List tabl /** * Clear the invalid segments from all the datamaps of the table - * @param carbonTable - * @param segments + * + * @param carbonTable table for which the operation has to be performed. + * @param segments segments which have to be cleared from cache. */ - public void clearInvalidSegments(CarbonTable carbonTable, List segments) + public void clearInvalidSegments(CarbonTable carbonTable, List segments) throws IOException { getDefaultDataMap(carbonTable).clear(segments); List allDataMap = getAllDataMap(carbonTable); @@ -467,6 +468,30 @@ public void clearInvalidSegments(CarbonTable carbonTable, List segments } + public List getSegmentsToBeRefreshed(CarbonTable carbonTable, + SegmentUpdateStatusManager updateStatusManager, List filteredSegmentToAccess) + throws IOException { + List toBeCleanedSegments = new ArrayList<>(); + for (Segment filteredSegment : filteredSegmentToAccess) { + boolean refreshNeeded = getTableSegmentRefresher(carbonTable).isRefreshNeeded(filteredSegment, + updateStatusManager.getInvalidTimestampRange(filteredSegment.getSegmentNo())); + if (refreshNeeded) { + toBeCleanedSegments.add(filteredSegment.getSegmentNo()); + } + } + return toBeCleanedSegments; + } + + public void refreshSegmentCacheIfRequired(CarbonTable carbonTable, + SegmentUpdateStatusManager updateStatusManager, List filteredSegmentToAccess) + throws IOException { + List toBeCleanedSegments = + getSegmentsToBeRefreshed(carbonTable, updateStatusManager, filteredSegmentToAccess); + if (toBeCleanedSegments.size() > 0) { + clearInvalidSegments(carbonTable, toBeCleanedSegments); + } + } + /** * Clear the datamap/datamaps of a table from memory * @@ -483,29 +508,44 @@ public void clearDataMaps(AbsoluteTableIdentifier identifier) { */ public void clearDataMaps(AbsoluteTableIdentifier identifier, boolean launchJob) { String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName(); - List tableIndices = allDataMaps.get(tableUniqueName); - if (tableIndices == null) { - String keyUsingTablePath = getKeyUsingTablePath(identifier.getTablePath()); - if (keyUsingTablePath != null) { - tableUniqueName = keyUsingTablePath; - tableIndices = allDataMaps.get(tableUniqueName); - } - } - if (launchJob && tableIndices != null) { - CarbonTable carbonTable = getCarbonTable(identifier); + CarbonTable carbonTable = getCarbonTable(identifier); + if (launchJob && CarbonProperties.getInstance() + .isDistributedPruningEnabled(identifier.getDatabaseName(), identifier.getTableName())) { if (null != carbonTable) { try { - DataMapUtil.executeDataMapJobForClearingDataMaps(carbonTable); + DataMapUtil.executeClearDataMapJob(carbonTable, DataMapUtil.DISTRIBUTED_JOB_NAME); } catch (IOException e) { LOGGER.error("clear dataMap job failed", e); // ignoring the exception } } + } else { + List tableIndices = allDataMaps.get(tableUniqueName); + if (tableIndices == null) { + String keyUsingTablePath = getKeyUsingTablePath(identifier.getTablePath()); + if (keyUsingTablePath != null) { + tableUniqueName = keyUsingTablePath; + } + } + if (launchJob && null != carbonTable) { + try { + DataMapUtil.executeClearDataMapJob(carbonTable, DataMapUtil.EMBEDDED_JOB_NAME); + } catch (IOException e) { + LOGGER.error("clear dataMap job failed", e); + // ignoring the exception + } + } + // remove carbon table from meta cache if launchJob is false as this would be called in + // executor side. + if (!launchJob) { + CarbonMetadata.getInstance() + .removeTable(identifier.getDatabaseName(), identifier.getTableName()); + } + segmentRefreshMap.remove(identifier.uniqueName()); + clearDataMaps(tableUniqueName); + allDataMaps.remove(tableUniqueName); + tablePathMap.remove(tableUniqueName); } - segmentRefreshMap.remove(identifier.uniqueName()); - clearDataMaps(tableUniqueName); - allDataMaps.remove(tableUniqueName); - tablePathMap.remove(tableUniqueName); } /** @@ -554,29 +594,41 @@ public void clearDataMaps(String tableUniqName) { * * @param identifier Table identifier */ - public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) { + public void deleteDataMap(AbsoluteTableIdentifier identifier, String dataMapName) { CarbonTable carbonTable = getCarbonTable(identifier); String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName(); - List tableIndices = allDataMaps.get(tableUniqueName); - if (tableIndices != null) { - int i = 0; - for (TableDataMap tableDataMap : tableIndices) { - if (carbonTable != null && tableDataMap != null && dataMapName - .equalsIgnoreCase(tableDataMap.getDataMapSchema().getDataMapName())) { - try { - DataMapUtil.executeDataMapJobForClearingDataMaps(carbonTable); - tableDataMap.clear(); - } catch (IOException e) { - LOGGER.error("clear dataMap job failed", e); - // ignoring the exception + if (CarbonProperties.getInstance() + .isDistributedPruningEnabled(identifier.getDatabaseName(), identifier.getTableName())) { + try { + DataMapUtil + .executeClearDataMapJob(carbonTable, DataMapUtil.DISTRIBUTED_JOB_NAME, dataMapName); + } catch (IOException e) { + LOGGER.error("clear dataMap job failed", e); + // ignoring the exception + } + } else { + List tableIndices = allDataMaps.get(tableUniqueName); + if (tableIndices != null) { + int i = 0; + for (TableDataMap tableDataMap : tableIndices) { + if (carbonTable != null && tableDataMap != null && dataMapName + .equalsIgnoreCase(tableDataMap.getDataMapSchema().getDataMapName())) { + try { + DataMapUtil + .executeClearDataMapJob(carbonTable, DataMapUtil.EMBEDDED_JOB_NAME, dataMapName); + tableDataMap.clear(); + } catch (IOException e) { + LOGGER.error("clear dataMap job failed", e); + // ignoring the exception + } + tableDataMap.deleteDatamapData(); + tableIndices.remove(i); + break; } - tableDataMap.deleteDatamapData(); - tableIndices.remove(i); - break; + i++; } - i++; + allDataMaps.put(tableUniqueName, tableIndices); } - allDataMaps.put(tableUniqueName, tableIndices); } } diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java index bea1cca87ce..95c69c1dbfe 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java @@ -18,10 +18,14 @@ package org.apache.carbondata.core.datamap; import java.io.IOException; -import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.dev.DataMap; +import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper; import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; @@ -30,6 +34,7 @@ import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.util.ObjectSerializationUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; @@ -38,6 +43,12 @@ public class DataMapUtil { private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr"; + public static final String EMBEDDED_JOB_NAME = + "org.apache.carbondata.indexserver.EmbeddedDataMapJob"; + + public static final String DISTRIBUTED_JOB_NAME = + "org.apache.carbondata.indexserver.DistributedDataMapJob"; + private static final Logger LOGGER = LogServiceFactory.getLogService(DataMapUtil.class.getName()); @@ -91,43 +102,110 @@ public static DataMapJob getDataMapJob(Configuration configuration) throws IOExc * @param carbonTable * @throws IOException */ - public static void executeDataMapJobForClearingDataMaps(CarbonTable carbonTable) + private static void executeClearDataMapJob(DataMapJob dataMapJob, + CarbonTable carbonTable, String dataMapToClear) throws IOException { + SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo = + getValidAndInvalidSegments(carbonTable, FileFactory.getConfiguration()); + List invalidSegment = new ArrayList<>(); + for (Segment segment : validAndInvalidSegmentsInfo.getInvalidSegments()) { + invalidSegment.add(segment.getSegmentNo()); + } + DistributableDataMapFormat dataMapFormat = new DistributableDataMapFormat(carbonTable, + validAndInvalidSegmentsInfo.getValidSegments(), invalidSegment, true, + dataMapToClear); + dataMapJob.execute(dataMapFormat); + } + + public static void executeClearDataMapJob(CarbonTable carbonTable, String jobClassName) throws IOException { - String dataMapJobClassName = "org.apache.carbondata.spark.rdd.SparkDataMapJob"; - DataMapJob dataMapJob = (DataMapJob) createDataMapJob(dataMapJobClassName); + executeClearDataMapJob(carbonTable, jobClassName, ""); + } + + static void executeClearDataMapJob(CarbonTable carbonTable, String jobClassName, + String dataMapToClear) throws IOException { + DataMapJob dataMapJob = (DataMapJob) createDataMapJob(jobClassName); if (dataMapJob == null) { return; } - String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat"; - SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo = - getValidAndInvalidSegments(carbonTable, FileFactory.getConfiguration()); - List validSegments = validAndInvalidSegmentsInfo.getValidSegments(); - List invalidSegments = validAndInvalidSegmentsInfo.getInvalidSegments(); - DataMapExprWrapper dataMapExprWrapper = null; - if (DataMapStoreManager.getInstance().getAllDataMap(carbonTable).size() > 0) { - DataMapChooser dataMapChooser = new DataMapChooser(carbonTable); - dataMapExprWrapper = dataMapChooser.getAllDataMapsForClear(carbonTable); - } else { - return; + executeClearDataMapJob(dataMapJob, carbonTable, dataMapToClear); + } + + public static DataMapJob getEmbeddedJob() { + DataMapJob dataMapJob = (DataMapJob) DataMapUtil.createDataMapJob(EMBEDDED_JOB_NAME); + if (dataMapJob == null) { + throw new ExceptionInInitializerError("Unable to create EmbeddedDataMapJob"); } - DistributableDataMapFormat dataMapFormat = - createDataMapJob(carbonTable, dataMapExprWrapper, validSegments, invalidSegments, null, - className, true); - dataMapJob.execute(dataMapFormat, null); + return dataMapJob; } - private static DistributableDataMapFormat createDataMapJob(CarbonTable carbonTable, - DataMapExprWrapper dataMapExprWrapper, List validsegments, - List invalidSegments, List partitionsToPrune, String clsName, - boolean isJobToClearDataMaps) { - try { - Constructor cons = Class.forName(clsName).getDeclaredConstructors()[0]; - return (DistributableDataMapFormat) cons - .newInstance(carbonTable, dataMapExprWrapper, validsegments, invalidSegments, - partitionsToPrune, isJobToClearDataMaps); - } catch (Exception e) { - throw new RuntimeException(e); + /** + * Prune the segments from the already pruned blocklets. + */ + public static void pruneSegments(List segments, List prunedBlocklets) { + Set validSegments = new HashSet<>(); + for (ExtendedBlocklet blocklet : prunedBlocklets) { + // Clear the old pruned index files if any present + blocklet.getSegment().getFilteredIndexShardNames().clear(); + // Set the pruned index file to the segment + // for further pruning. + String shardName = CarbonTablePath.getShardName(blocklet.getFilePath()); + blocklet.getSegment().setFilteredIndexShardName(shardName); + validSegments.add(blocklet.getSegment()); } + segments.clear(); + segments.addAll(validSegments); + } + + static List pruneDataMaps(CarbonTable table, + FilterResolverIntf filterResolverIntf, List segmentsToLoad, + List partitions, List blocklets) throws IOException { + pruneSegments(segmentsToLoad, blocklets); + List cgDataMaps = pruneDataMaps(table, filterResolverIntf, segmentsToLoad, + partitions, blocklets, + DataMapLevel.CG); + pruneSegments(segmentsToLoad, cgDataMaps); + return pruneDataMaps(table, filterResolverIntf, segmentsToLoad, + partitions, cgDataMaps, + DataMapLevel.FG); + } + + static List pruneDataMaps(CarbonTable table, + FilterResolverIntf filterResolverIntf, List segmentsToLoad, + List partitions, List blocklets, DataMapLevel dataMapLevel) + throws IOException { + DataMapExprWrapper dataMapExprWrapper = + new DataMapChooser(table).chooseDataMap(dataMapLevel, filterResolverIntf); + if (dataMapExprWrapper != null) { + List extendedBlocklets = new ArrayList<>(); + // Prune segments from already pruned blocklets + for (DataMapDistributableWrapper wrapper : dataMapExprWrapper + .toDistributable(segmentsToLoad)) { + TableDataMap dataMap = DataMapStoreManager.getInstance() + .getDataMap(table, wrapper.getDistributable().getDataMapSchema()); + List dataMaps = dataMap.getTableDataMaps(wrapper.getDistributable()); + List prunnedBlocklet = new ArrayList<>(); + if (table.isTransactionalTable()) { + prunnedBlocklet.addAll(dataMap.prune(dataMaps, wrapper.getDistributable(), + dataMapExprWrapper.getFilterResolverIntf(wrapper.getUniqueId()), partitions)); + } else { + prunnedBlocklet + .addAll(dataMap.prune(segmentsToLoad, new DataMapFilter(filterResolverIntf), + partitions)); + } + // For all blocklets initialize the detail info so that it can be serialized to the driver. + for (ExtendedBlocklet blocklet : prunnedBlocklet) { + blocklet.getDetailInfo(); + blocklet.setDataMapUniqueId(wrapper.getUniqueId()); + } + extendedBlocklets.addAll(prunnedBlocklet); + } + return dataMapExprWrapper.pruneBlocklets(extendedBlocklets); + } + // For all blocklets initialize the detail info so that it can be serialized to the driver. + for (ExtendedBlocklet blocklet : blocklets) { + blocklet.getDetailInfo(); + } + return blocklets; } /** @@ -136,23 +214,36 @@ private static DistributableDataMapFormat createDataMapJob(CarbonTable carbonTab * @return list of Extended blocklets after pruning */ public static List executeDataMapJob(CarbonTable carbonTable, - FilterResolverIntf resolver, List validSegments, - DataMapExprWrapper dataMapExprWrapper, DataMapJob dataMapJob, - List partitionsToPrune) throws IOException { - String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat"; - SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo = - getValidAndInvalidSegments(carbonTable, validSegments.get(0).getConfiguration()); - List invalidSegments = validAndInvalidSegmentsInfo.getInvalidSegments(); + FilterResolverIntf resolver, DataMapJob dataMapJob, List partitionsToPrune, + List validSegments, List invalidSegments, DataMapLevel level, + List segmentsToBeRefreshed) throws IOException { + return executeDataMapJob(carbonTable, resolver, dataMapJob, partitionsToPrune, validSegments, + invalidSegments, level, false, segmentsToBeRefreshed); + } + + /** + * this method gets the datamapJob and call execute of that job, this will be launched for + * distributed CG or FG + * @return list of Extended blocklets after pruning + */ + public static List executeDataMapJob(CarbonTable carbonTable, + FilterResolverIntf resolver, DataMapJob dataMapJob, List partitionsToPrune, + List validSegments, List invalidSegments, DataMapLevel level, + Boolean isFallbackJob, List segmentsToBeRefreshed) throws IOException { + List invalidSegmentNo = new ArrayList<>(); + for (Segment segment : invalidSegments) { + invalidSegmentNo.add(segment.getSegmentNo()); + } + invalidSegmentNo.addAll(segmentsToBeRefreshed); DistributableDataMapFormat dataMapFormat = - createDataMapJob(carbonTable, dataMapExprWrapper, validSegments, invalidSegments, - partitionsToPrune, className, false); - List prunedBlocklets = dataMapJob.execute(dataMapFormat, resolver); + new DistributableDataMapFormat(carbonTable, resolver, validSegments, invalidSegmentNo, + partitionsToPrune, false, level, isFallbackJob); + List prunedBlocklets = dataMapJob.execute(dataMapFormat); // Apply expression on the blocklets. - prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets); return prunedBlocklets; } - private static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvalidSegments( + public static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvalidSegments( CarbonTable carbonTable, Configuration configuration) throws IOException { SegmentStatusManager ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(), configuration); diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java index 4c2300812e2..f76cfeceb6b 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java @@ -16,62 +16,102 @@ */ package org.apache.carbondata.core.datamap; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; +import java.nio.charset.Charset; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.List; +import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper; -import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.readcommitter.ReadCommittedScope; +import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.util.ObjectSerializationUtil; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.log4j.Logger; /** * Input format for datamaps, it makes the datamap pruning distributable. */ -public class DistributableDataMapFormat extends FileInputFormat implements - Serializable { +public class DistributableDataMapFormat extends FileInputFormat + implements Serializable, Writable { + + private static final transient Logger LOGGER = + LogServiceFactory.getLogService(DistributableDataMapFormat.class.getName()); + + private static final long serialVersionUID = 9189779090091151248L; private CarbonTable table; - private DataMapExprWrapper dataMapExprWrapper; + private FilterResolverIntf filterResolverIntf; private List validSegments; - private List invalidSegments; + private List invalidSegments; private List partitions; - private DataMapDistributableWrapper distributable; - private boolean isJobToClearDataMaps = false; - DistributableDataMapFormat(CarbonTable table, DataMapExprWrapper dataMapExprWrapper, - List validSegments, List invalidSegments, List partitions, - boolean isJobToClearDataMaps) { + private DataMapLevel dataMapLevel; + + private boolean isFallbackJob = false; + + private String dataMapToClear = ""; + + private ReadCommittedScope readCommittedScope; + + DistributableDataMapFormat() { + + } + + DistributableDataMapFormat(CarbonTable table, + List validSegments, List invalidSegments, boolean isJobToClearDataMaps, + String dataMapToClear) throws IOException { + this(table, null, validSegments, invalidSegments, null, + isJobToClearDataMaps, null, false); + this.dataMapToClear = dataMapToClear; + } + + DistributableDataMapFormat(CarbonTable table, FilterResolverIntf filterResolverIntf, + List validSegments, List invalidSegments, List partitions, + boolean isJobToClearDataMaps, DataMapLevel dataMapLevel, boolean isFallbackJob) + throws IOException { this.table = table; - this.dataMapExprWrapper = dataMapExprWrapper; + this.filterResolverIntf = filterResolverIntf; this.validSegments = validSegments; + if (!validSegments.isEmpty()) { + this.readCommittedScope = validSegments.get(0).getReadCommittedScope(); + } this.invalidSegments = invalidSegments; this.partitions = partitions; this.isJobToClearDataMaps = isJobToClearDataMaps; + this.dataMapLevel = dataMapLevel; + this.isFallbackJob = isFallbackJob; } @Override public List getSplits(JobContext job) throws IOException { - List distributables = - dataMapExprWrapper.toDistributable(validSegments); + List distributables; + distributables = + DataMapChooser.getDefaultDataMap(table, filterResolverIntf).toDistributable(validSegments); List inputSplits = new ArrayList<>(distributables.size()); inputSplits.addAll(distributables); return inputSplits; @@ -85,33 +125,67 @@ public RecordReader createRecordReader(InputSplit inputS private ExtendedBlocklet currBlocklet; private List dataMaps; - @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - distributable = (DataMapDistributableWrapper) inputSplit; - // clear the segmentMap and from cache in executor when there are invalid segments - if (invalidSegments.size() > 0) { - DataMapStoreManager.getInstance().clearInvalidSegments(table, invalidSegments); - } - TableDataMap tableDataMap = DataMapStoreManager.getInstance() - .getDataMap(table, distributable.getDistributable().getDataMapSchema()); + DataMapDistributableWrapper distributable = (DataMapDistributableWrapper) inputSplit; + distributable.getDistributable().getSegment().setCacheable(!isFallbackJob); + distributable.getDistributable().getSegment().setReadCommittedScope(readCommittedScope); + List segmentsToLoad = new ArrayList<>(); + segmentsToLoad.add(distributable.getDistributable().getSegment()); if (isJobToClearDataMaps) { - // if job is to clear datamaps just clear datamaps from cache and return - DataMapStoreManager.getInstance() - .clearDataMaps(table.getCarbonTableIdentifier().getTableUniqueName()); - // clear the segment properties cache from executor - SegmentPropertiesAndSchemaHolder.getInstance() - .invalidate(table.getAbsoluteTableIdentifier()); - blockletIterator = Collections.emptyIterator(); + if (StringUtils.isNotEmpty(dataMapToClear)) { + List dataMaps = + DataMapStoreManager.getInstance().getAllDataMap(table); + int i = 0; + for (TableDataMap tableDataMap : dataMaps) { + if (tableDataMap != null && dataMapToClear + .equalsIgnoreCase(tableDataMap.getDataMapSchema().getDataMapName())) { + tableDataMap.deleteSegmentDatamapData( + ((DataMapDistributableWrapper) inputSplit).getDistributable().getSegment() + .getSegmentNo()); + tableDataMap.clear(); + dataMaps.remove(i); + break; + } + i++; + } + DataMapStoreManager.getInstance().getAllDataMaps().put(table.getTableUniqueName(), + dataMaps); + } else { + // if job is to clear datamaps just clear datamaps from cache and return + DataMapStoreManager.getInstance() + .clearDataMaps(table.getCarbonTableIdentifier().getTableUniqueName()); + // clear the segment properties cache from executor + SegmentPropertiesAndSchemaHolder.getInstance() + .invalidate(table.getAbsoluteTableIdentifier()); + } + List list = new ArrayList(); + list.add(new ExtendedBlocklet()); + blockletIterator = list.iterator(); return; + } else if (invalidSegments.size() > 0) { + // clear the segmentMap and from cache in executor when there are invalid segments + DataMapStoreManager.getInstance().clearInvalidSegments(table, invalidSegments); } - dataMaps = tableDataMap.getTableDataMaps(distributable.getDistributable()); - List blocklets = tableDataMap - .prune(dataMaps, - distributable.getDistributable(), - dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions); - for (ExtendedBlocklet blocklet : blocklets) { - blocklet.getDetailInfo(); - blocklet.setDataMapUniqueId(distributable.getUniqueId()); + List blocklets = new ArrayList<>(); + if (dataMapLevel == null) { + TableDataMap defaultDataMap = DataMapStoreManager.getInstance() + .getDataMap(table, distributable.getDistributable().getDataMapSchema()); + dataMaps = defaultDataMap.getTableDataMaps(distributable.getDistributable()); + if (table.isTransactionalTable()) { + blocklets = defaultDataMap.prune(dataMaps, distributable.getDistributable(), + filterResolverIntf, partitions); + } else { + blocklets = defaultDataMap.prune(segmentsToLoad, new DataMapFilter(filterResolverIntf), + partitions); + } + blocklets = DataMapUtil + .pruneDataMaps(table, filterResolverIntf, segmentsToLoad, partitions, blocklets); + } else { + blocklets = DataMapUtil + .pruneDataMaps(table, filterResolverIntf, segmentsToLoad, partitions, blocklets, + dataMapLevel); } blockletIterator = blocklets.iterator(); } @@ -154,4 +228,110 @@ public void close() throws IOException { }; } + public CarbonTable getCarbonTable() { + return table; + } + + @Override + public void write(DataOutput out) throws IOException { + table.write(out); + out.writeInt(invalidSegments.size()); + for (String invalidSegment : invalidSegments) { + out.writeUTF(invalidSegment); + } + out.writeBoolean(isJobToClearDataMaps); + out.writeBoolean(isFallbackJob); + if (dataMapLevel == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeUTF(dataMapLevel.name()); + } + out.writeInt(validSegments.size()); + for (Segment segment : validSegments) { + segment.write(out); + } + if (partitions == null) { + out.writeBoolean(false); + } else { + out.writeInt(partitions.size()); + for (PartitionSpec partitionSpec : partitions) { + partitionSpec.write(out); + } + } + if (filterResolverIntf != null) { + out.writeBoolean(true); + byte[] filterResolverBytes = ObjectSerializationUtil.convertObjectToString(filterResolverIntf) + .getBytes(Charset.defaultCharset()); + out.writeInt(filterResolverBytes.length); + out.write(filterResolverBytes); + + } else { + out.writeBoolean(false); + } + out.writeUTF(dataMapToClear); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.table = new CarbonTable(); + table.readFields(in); + int invalidSegmentSize = in.readInt(); + invalidSegments = new ArrayList<>(invalidSegmentSize); + for (int i = 0; i < invalidSegmentSize; i++) { + invalidSegments.add(in.readUTF()); + } + this.isJobToClearDataMaps = in.readBoolean(); + this.isFallbackJob = in.readBoolean(); + if (in.readBoolean()) { + this.dataMapLevel = DataMapLevel.valueOf(in.readUTF()); + } + int validSegmentSize = in.readInt(); + validSegments = new ArrayList<>(validSegmentSize); + initReadCommittedScope(); + for (int i = 0; i < validSegmentSize; i++) { + Segment segment = new Segment(); + segment.setReadCommittedScope(readCommittedScope); + segment.readFields(in); + validSegments.add(segment); + } + if (in.readBoolean()) { + int numPartitions = in.readInt(); + partitions = new ArrayList<>(numPartitions); + for (int i = 0; i < numPartitions; i++) { + PartitionSpec partitionSpec = new PartitionSpec(); + partitionSpec.readFields(in); + partitions.add(partitionSpec); + } + } + if (in.readBoolean()) { + byte[] filterResolverBytes = new byte[in.readInt()]; + in.readFully(filterResolverBytes, 0, filterResolverBytes.length); + this.filterResolverIntf = (FilterResolverIntf) ObjectSerializationUtil + .convertStringToObject(new String(filterResolverBytes, Charset.defaultCharset())); + } + this.dataMapToClear = in.readUTF(); + } + + private void initReadCommittedScope() throws IOException { + if (readCommittedScope == null) { + this.readCommittedScope = + new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(), + FileFactory.getConfiguration()); + } + } + + /** + * @return Whether the job is fallback or not. + */ + public boolean isFallbackJob() { + return isFallbackJob; + } + + /** + * @return Whether the job is to clear cached datamaps or not. + */ + public boolean isJobToClearDataMaps() { + return isJobToClearDataMaps; + } } diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java index 4797b533c8f..9370be83c67 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java @@ -16,6 +16,8 @@ */ package org.apache.carbondata.core.datamap; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -25,6 +27,7 @@ import java.util.Objects; import java.util.Set; +import org.apache.carbondata.core.metadata.schema.table.Writable; import org.apache.carbondata.core.mutate.UpdateVO; import org.apache.carbondata.core.readcommitter.ReadCommittedScope; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; @@ -37,7 +40,7 @@ /** * Represents one load of carbondata */ -public class Segment implements Serializable { +public class Segment implements Serializable, Writable { private static final long serialVersionUID = 7044555408162234064L; @@ -55,15 +58,26 @@ public class Segment implements Serializable { * transactional isolation level which only allows snapshot read of the * data and make non committed data invisible to the reader. */ - private ReadCommittedScope readCommittedScope; + private transient ReadCommittedScope readCommittedScope; /** * keeps all the details about segments */ - private LoadMetadataDetails loadMetadataDetails; + private transient LoadMetadataDetails loadMetadataDetails; private String segmentString; + private long indexSize = 0; + + /** + * Whether to cache the segment data maps in executors or not. + */ + private boolean isCacheable = true; + + public Segment() { + + } + public Segment(String segmentNo) { this.segmentNo = segmentNo; } @@ -120,6 +134,9 @@ public Segment(String segmentNo, String segmentFileName, ReadCommittedScope read this.segmentFileName = segmentFileName; this.readCommittedScope = readCommittedScope; this.loadMetadataDetails = loadMetadataDetails; + if (loadMetadataDetails.getIndexSize() != null) { + this.indexSize = Long.parseLong(loadMetadataDetails.getIndexSize()); + } if (segmentFileName != null) { segmentString = segmentNo + "#" + segmentFileName; } else { @@ -155,6 +172,10 @@ public void setReadCommittedScope(ReadCommittedScope readCommittedScope) { this.readCommittedScope = readCommittedScope; } + public ReadCommittedScope getReadCommittedScope() { + return readCommittedScope; + } + public static List toSegmentList(String[] segmentIds, ReadCommittedScope readCommittedScope) { List list = new ArrayList<>(segmentIds.length); @@ -257,4 +278,56 @@ public void setFilteredIndexShardName(String filteredIndexShardName) { public LoadMetadataDetails getLoadMetadataDetails() { return loadMetadataDetails; } + + public long getIndexSize() { + return indexSize; + } + + public void setIndexSize(long indexSize) { + this.indexSize = indexSize; + } + + public boolean isCacheable() { + return isCacheable; + } + + public void setCacheable(boolean cacheable) { + isCacheable = cacheable; + } + + @Override public void write(DataOutput out) throws IOException { + out.writeUTF(segmentNo); + boolean writeSegmentFileName = segmentFileName != null; + out.writeBoolean(writeSegmentFileName); + if (writeSegmentFileName) { + out.writeUTF(segmentFileName); + } + out.writeInt(filteredIndexShardNames.size()); + for (String name: filteredIndexShardNames) { + out.writeUTF(name); + } + if (segmentString == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeUTF(segmentString); + } + out.writeLong(indexSize); + } + + @Override public void readFields(DataInput in) throws IOException { + this.segmentNo = in.readUTF(); + if (in.readBoolean()) { + this.segmentFileName = in.readUTF(); + } + filteredIndexShardNames = new HashSet<>(); + int indexShardNameSize = in.readInt(); + for (int i = 0; i < indexShardNameSize; i++) { + filteredIndexShardNames.add(in.readUTF()); + } + if (in.readBoolean()) { + this.segmentString = in.readUTF(); + } + this.indexSize = in.readLong(); + } } diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java index 4375abbca1f..bc87298fcd5 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java @@ -356,14 +356,7 @@ private List addSegmentId(List pruneBlocklet public List toDistributable(List segments) throws IOException { List distributables = new ArrayList<>(); for (Segment segment : segments) { - List list = - dataMapFactory.toDistributable(segment); - for (DataMapDistributable distributable : list) { - distributable.setDataMapSchema(dataMapSchema); - distributable.setSegment(segment); - distributable.setTablePath(identifier.getTablePath()); - } - distributables.addAll(list); + distributables.addAll(dataMapFactory.toDistributable(segment)); } return distributables; } @@ -420,10 +413,10 @@ public List prune(List dataMaps, DataMapDistributable /** * Clear only the datamaps of the segments - * @param segments + * @param segmentIds list of segmentIds to be cleared from cache. */ - public void clear(List segments) { - for (Segment segment: segments) { + public void clear(List segmentIds) { + for (String segment: segmentIds) { dataMapFactory.clear(segment); } } @@ -452,6 +445,13 @@ public void deleteDatamapData() { dataMapFactory.deleteDatamapData(); } + /** + * delete datamap data for a segment if any + */ + public void deleteSegmentDatamapData(String segmentNo) throws IOException { + dataMapFactory.deleteSegmentDatamapData(segmentNo); + } + public DataMapSchema getDataMapSchema() { return dataMapSchema; } @@ -464,31 +464,6 @@ public DataMapFactory getDataMapFactory() { dataMapFactory.fireEvent(event); } - /** - * Method to prune the segments based on task min/max values - * - * @param segments - * @param filterExp - * @return - * @throws IOException - */ - public List pruneSegments(List segments, FilterResolverIntf filterExp) - throws IOException { - List prunedSegments = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - for (Segment segment : segments) { - List dataMaps = dataMapFactory.getDataMaps(segment); - for (DataMap dataMap : dataMaps) { - if (dataMap.isScanRequired(filterExp)) { - // If any one task in a given segment contains the data that means the segment need to - // be scanned and we need to validate further data maps in the same segment - prunedSegments.add(segment); - break; - } - } - } - return prunedSegments; - } - /** * Prune the datamap of the given segments and return the Map of blocklet path and row count * diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java index ee7914df7f3..bc414f9f568 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java @@ -111,9 +111,11 @@ public abstract List getDataMaps(DataMapDistributable distributable) public abstract void fireEvent(Event event); /** - * Clears datamap of the segment + * Clear all datamaps for a segment from memory */ - public abstract void clear(Segment segment); + public void clear(String segmentNo) { + + } /** * Clear all datamaps from memory @@ -140,6 +142,13 @@ public abstract List getDataMaps(DataMapDistributable distributable) */ public abstract void deleteDatamapData(); + /** + * delete datamap data if any + */ + public void deleteSegmentDatamapData(String segmentNo) throws IOException { + + } + /** * This function should return true is the input operation enum will make the datamap become stale */ diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java index 3270d086303..9aeb6c44552 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.io.Serializable; -import org.apache.carbondata.core.metadata.schema.table.Writable; +import org.apache.hadoop.io.Writable; /** * Blocklet @@ -67,14 +67,28 @@ public String getFilePath() { @Override public void write(DataOutput out) throws IOException { - out.writeUTF(filePath); - out.writeUTF(blockletId); + if (filePath == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeUTF(filePath); + } + if (blockletId == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeUTF(blockletId); + } } @Override public void readFields(DataInput in) throws IOException { - filePath = in.readUTF(); - blockletId = in.readUTF(); + if (in.readBoolean()) { + filePath = in.readUTF(); + } + if (in.readBoolean()) { + blockletId = in.readUTF(); + } } @Override diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java index 1971f40d486..d4d86d0c0c4 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java @@ -60,4 +60,6 @@ List getAllBlocklets(Segment segment, List partitions) * clears the datamap from cache and segmentMap from executor */ void clear(); + + String getCacheSize() throws IOException ; } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java index 3d6ceddfa2f..1de1ab5c856 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java @@ -16,6 +16,8 @@ */ package org.apache.carbondata.core.indexstore; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.util.List; @@ -34,6 +36,10 @@ public class ExtendedBlocklet extends Blocklet { private CarbonInputSplit inputSplit; + public ExtendedBlocklet() { + + } + public ExtendedBlocklet(String filePath, String blockletId, boolean compareBlockletIdForObjectMatching, ColumnarFormatVersion version) { super(filePath, blockletId, compareBlockletIdForObjectMatching); @@ -144,4 +150,51 @@ public void setColumnSchema(List columnSchema) { this.inputSplit.setColumnSchema(columnSchema); } + + + @Override public void write(DataOutput out) throws IOException { + super.write(out); + if (dataMapUniqueId == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeUTF(dataMapUniqueId); + } + if (inputSplit != null) { + out.writeBoolean(true); + inputSplit.write(out); + String[] locations = getLocations(); + if (locations != null) { + out.writeBoolean(true); + out.writeInt(locations.length); + for (String location : locations) { + out.writeUTF(location); + } + } else { + out.writeBoolean(false); + } + } else { + out.writeBoolean(false); + } + } + + @Override public void readFields(DataInput in) throws IOException { + super.readFields(in); + if (in.readBoolean()) { + dataMapUniqueId = in.readUTF(); + } + if (in.readBoolean()) { + inputSplit = new CarbonInputSplit(); + inputSplit.readFields(in); + if (in.readBoolean()) { + int numLocations = in.readInt(); + String[] locations = new String[numLocations]; + for (int i = 0; i < numLocations; i++) { + locations[i] = in.readUTF(); + } + inputSplit.setLocation(locations); + } + } + } + } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java b/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java index 87c875e6407..0d989cc8c40 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java @@ -16,19 +16,24 @@ */ package org.apache.carbondata.core.indexstore; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.io.Serializable; import java.net.URI; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; /** * Holds partition information. */ -public class PartitionSpec implements Serializable { +public class PartitionSpec implements Serializable, Writable { private static final long serialVersionUID = 4828007433384867678L; @@ -43,6 +48,10 @@ public class PartitionSpec implements Serializable { private String uuid; + public PartitionSpec() { + + } + public PartitionSpec(List partitions, String location) { this.partitions = partitions; this.locationPath = new Path(FileFactory.getUpdatedFilePath(location)); @@ -89,4 +98,45 @@ public void setUuid(String uuid) { return "PartitionSpec{" + "partitions=" + partitions + ", locationPath=" + locationPath + ", location='" + location + '\'' + '}'; } + + @Override public void write(DataOutput out) throws IOException { + if (partitions == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeInt(partitions.size()); + for (String partition : partitions) { + out.writeUTF(partition); + } + } + if (uuid == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeUTF(uuid); + } + if (location == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeUTF(location); + } + } + + @Override public void readFields(DataInput in) throws IOException { + if (in.readBoolean()) { + int numPartitions = in.readInt(); + partitions = new ArrayList<>(numPartitions); + for (int i = 0; i < numPartitions; i++) { + partitions.add(in.readUTF()); + } + } + if (in.readBoolean()) { + uuid = in.readUTF(); + } + if (in.readBoolean()) { + location = in.readUTF(); + } + } + } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java index b12519748f8..88554fcbc9a 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java @@ -53,7 +53,7 @@ public TableBlockIndexUniqueIdentifierWrapper( this.configuration = FileFactory.getConfiguration(); } - public TableBlockIndexUniqueIdentifierWrapper( + private TableBlockIndexUniqueIdentifierWrapper( TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, CarbonTable carbonTable, Configuration configuration) { this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifier; @@ -65,9 +65,8 @@ public TableBlockIndexUniqueIdentifierWrapper( // Kindly do not remove public TableBlockIndexUniqueIdentifierWrapper( TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, CarbonTable carbonTable, - boolean addTableBlockToUnsafeAndLRUCache) { - this(tableBlockIndexUniqueIdentifier, carbonTable); - this.configuration = FileFactory.getConfiguration(); + Configuration configuration, boolean addTableBlockToUnsafeAndLRUCache) { + this(tableBlockIndexUniqueIdentifier, carbonTable, configuration); this.addTableBlockToUnsafeAndLRUCache = addTableBlockToUnsafeAndLRUCache; } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java index 7cdf77d546a..bb91eb0845b 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java @@ -32,8 +32,14 @@ public class BlockletDataMapDistributable extends DataMapDistributable { */ private String filePath; + private String segmentPath; + private TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier; + public BlockletDataMapDistributable() { + + } + public BlockletDataMapDistributable(String indexFilePath) { this.filePath = indexFilePath; } @@ -50,4 +56,12 @@ public void setTableBlockIndexUniqueIdentifier( TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifiers) { this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifiers; } + + public String getSegmentPath() { + return segmentPath; + } + + public void setSegmentPath(String segmentPath) { + this.segmentPath = segmentPath; + } } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index 2ef7b8809bb..25eaf0c6450 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -23,7 +23,6 @@ import org.apache.carbondata.core.cache.Cache; import org.apache.carbondata.core.cache.CacheProvider; import org.apache.carbondata.core.cache.CacheType; -import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.DataMapMeta; import org.apache.carbondata.core.datamap.Segment; @@ -33,6 +32,7 @@ import org.apache.carbondata.core.datamap.dev.DataMapWriter; import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap; import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory; +import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; @@ -54,10 +54,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.events.Event; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; /** * Table map for blocklet @@ -134,7 +131,7 @@ public Map> getDataMaps(List segments for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) { tableBlockIndexUniqueIdentifierWrappers.add( new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, - this.getCarbonTable(), segment.getConfiguration())); + this.getCarbonTable(), segment.getConfiguration(), segment.isCacheable())); } } List blockletDataMapIndexWrappers = @@ -257,29 +254,14 @@ private ExtendedBlocklet getExtendedBlocklet( public List toDistributable(Segment segment) { List distributables = new ArrayList<>(); try { - Set tableBlockIndexUniqueIdentifiers = - getTableBlockIndexUniqueIdentifiers(segment); - CarbonFile[] carbonIndexFiles = new CarbonFile[tableBlockIndexUniqueIdentifiers.size()]; - int identifierCounter = 0; - for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : - tableBlockIndexUniqueIdentifiers) { - String indexFilePath = tableBlockIndexUniqueIdentifier.getIndexFilePath(); - String fileName = tableBlockIndexUniqueIdentifier.getIndexFileName(); - carbonIndexFiles[identifierCounter++] = FileFactory - .getCarbonFile(indexFilePath + CarbonCommonConstants.FILE_SEPARATOR + fileName); - } - for (int i = 0; i < carbonIndexFiles.length; i++) { - Path path = new Path(carbonIndexFiles[i].getPath()); - FileSystem fs = path.getFileSystem(FileFactory.getConfiguration()); - RemoteIterator iter = fs.listLocatedStatus(path); - LocatedFileStatus fileStatus = iter.next(); - String[] location = fileStatus.getBlockLocations()[0].getHosts(); - BlockletDataMapDistributable distributable = - new BlockletDataMapDistributable(path.toString()); - distributable.setLocations(location); - distributables.add(distributable); - } - } catch (IOException e) { + BlockletDataMapDistributable distributable = new BlockletDataMapDistributable(); + distributable.setSegment(segment); + distributable.setDataMapSchema(DATA_MAP_SCHEMA); + distributable.setSegmentPath(CarbonTablePath.getSegmentPath(identifier.getTablePath(), + segment.getSegmentNo())); + distributables.add(new DataMapDistributableWrapper(UUID.randomUUID().toString(), + distributable).getDistributable()); + } catch (Exception e) { throw new RuntimeException(e); } return distributables; @@ -291,8 +273,8 @@ public void fireEvent(Event event) { } @Override - public void clear(Segment segment) { - Set blockIndexes = segmentMap.remove(segment.getSegmentNo()); + public void clear(String segment) { + Set blockIndexes = segmentMap.remove(segment); if (blockIndexes != null) { for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) { TableBlockIndexUniqueIdentifierWrapper blockIndexWrapper = @@ -315,22 +297,95 @@ public void clear(Segment segment) { public synchronized void clear() { if (segmentMap.size() > 0) { for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) { - clear(new Segment(segmentId, null, null)); + clear(segmentId); } } } - @Override - public List getDataMaps(DataMapDistributable distributable) + @Override public String getCacheSize() throws IOException { + long sum = 0L; + int numOfIndexFiles = 0; + for (Map.Entry> entry : segmentMap.entrySet()) { + for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : entry.getValue()) { + sum += cache.get(new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, + getCarbonTable())).getMemorySize(); + numOfIndexFiles++; + } + } + return numOfIndexFiles + ":" + sum; + } + + @Override public List getDataMaps(DataMapDistributable distributable) throws IOException { BlockletDataMapDistributable mapDistributable = (BlockletDataMapDistributable) distributable; - List identifiersWrapper = new ArrayList<>(); - Path indexPath = new Path(mapDistributable.getFilePath()); + List identifiersWrapper; String segmentNo = mapDistributable.getSegment().getSegmentNo(); + if (mapDistributable.getSegmentPath() != null) { + identifiersWrapper = getTableBlockIndexUniqueIdentifier(distributable); + } else { + identifiersWrapper = + getTableBlockIndexUniqueIdentifier(mapDistributable.getFilePath(), segmentNo); + } + List dataMaps = new ArrayList<>(); + try { + List wrappers = cache.getAll(identifiersWrapper); + for (BlockletDataMapIndexWrapper wrapper : wrappers) { + dataMaps.addAll(wrapper.getDataMaps()); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return dataMaps; + } + + private List getTableBlockIndexUniqueIdentifier( + DataMapDistributable distributable) throws IOException { + List identifiersWrapper = new ArrayList<>(); + Set tableBlockIndexUniqueIdentifiers = + segmentMap.get(distributable.getSegment().getSegmentNo()); + if (tableBlockIndexUniqueIdentifiers == null) { + Set indexFiles = distributable.getSegment().getCommittedIndexFile().keySet(); + for (String indexFile : indexFiles) { + CarbonFile carbonFile = FileFactory.getCarbonFile(indexFile); + String indexFileName; + String mergeIndexName; + if (indexFile.endsWith(CarbonTablePath.INDEX_FILE_EXT)) { + indexFileName = carbonFile.getName(); + mergeIndexName = null; + } else { + indexFileName = carbonFile.getName(); + mergeIndexName = carbonFile.getName(); + } + String parentPath = carbonFile.getParentFile().getAbsolutePath(); + TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier = + new TableBlockIndexUniqueIdentifier(parentPath, indexFileName, mergeIndexName, + distributable.getSegment().getSegmentNo()); + identifiersWrapper.add( + new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, + this.getCarbonTable())); + tableBlockIndexUniqueIdentifiers = new HashSet<>(); + tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier); + segmentMap.put(distributable.getSegment().getSegmentNo(), tableBlockIndexUniqueIdentifiers); + } + } else { + for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : + tableBlockIndexUniqueIdentifiers) { + identifiersWrapper.add( + new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, + getCarbonTable())); + } + } + return identifiersWrapper; + } + + private List getTableBlockIndexUniqueIdentifier( + String indexFilePath, String segmentId) throws IOException { + Path indexPath = new Path(indexFilePath); + List identifiersWrapper = new ArrayList<>(); if (indexPath.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) { String parent = indexPath.getParent().toString(); identifiersWrapper.add(new TableBlockIndexUniqueIdentifierWrapper( - new TableBlockIndexUniqueIdentifier(parent, indexPath.getName(), null, segmentNo), + new TableBlockIndexUniqueIdentifier(parent, indexPath.getName(), null, segmentId), this.getCarbonTable())); } else if (indexPath.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { SegmentIndexFileStore fileStore = new SegmentIndexFileStore(); @@ -340,19 +395,10 @@ public List getDataMaps(DataMapDistributable distributable) for (String indexFile : indexFiles) { identifiersWrapper.add(new TableBlockIndexUniqueIdentifierWrapper( new TableBlockIndexUniqueIdentifier(parentPath, indexFile, carbonFile.getName(), - segmentNo), this.getCarbonTable())); + segmentId), this.getCarbonTable())); } } - List dataMaps = new ArrayList<>(); - try { - List wrappers = cache.getAll(identifiersWrapper); - for (BlockletDataMapIndexWrapper wrapper : wrappers) { - dataMaps.addAll(wrapper.getDataMaps()); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - return dataMaps; + return identifiersWrapper; } @Override diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java index 224b2305038..69e5dc33286 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java @@ -342,13 +342,10 @@ public static boolean updateSegmentFile(CarbonTable carbonTable, String segmentI */ public static void clearBlockDataMapCache(CarbonTable carbonTable, String segmentId) { TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(carbonTable); - Segment segment = new Segment(segmentId); - List segments = new ArrayList<>(); - segments.add(segment); LOGGER.info( "clearing cache while updating segment file entry in table status file for segmentId: " + segmentId); - defaultDataMap.clear(segments); + defaultDataMap.getDataMapFactory().clear(segmentId); } private static CarbonFile[] getSegmentFiles(String segmentPath) { diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java index c8bb5ade3d0..dcc3336ce8c 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java @@ -34,6 +34,7 @@ */ public class AggregationDataMapSchema extends DataMapSchema { + private static final long serialVersionUID = 5900935117929888412L; /** * map of parent column name to set of child column column without * aggregation function @@ -63,7 +64,9 @@ public class AggregationDataMapSchema extends DataMapSchema { */ private int ordinal = Integer.MAX_VALUE; - private Set aggExpToColumnMapping; + // Dont remove transient otherwise serialization for carbonTable will fail using + // JavaSerialization in spark. + private transient Set aggExpToColumnMapping; AggregationDataMapSchema(String dataMapName, String className) { super(dataMapName, className); diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index 54ea7729937..4a75420037f 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -17,6 +17,8 @@ package org.apache.carbondata.core.metadata.schema.table; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -73,7 +75,7 @@ /** * Mapping class for Carbon actual table */ -public class CarbonTable implements Serializable { +public class CarbonTable implements Serializable, Writable { private static final Logger LOGGER = LogServiceFactory.getLogService(CarbonTable.class.getName()); @@ -186,7 +188,7 @@ public class CarbonTable implements Serializable { */ private boolean isTransactionalTable = true; - private CarbonTable() { + public CarbonTable() { this.tableDimensionsMap = new HashMap>(); this.tableImplicitDimensionsMap = new HashMap>(); this.tableMeasuresMap = new HashMap>(); @@ -1405,4 +1407,14 @@ public SortScopeOptions.SortScope getSortScope() { return SortScopeOptions.getSortScope(sortScope); } } + + @Override public void write(DataOutput out) throws IOException { + tableInfo.write(out); + } + + @Override public void readFields(DataInput in) throws IOException { + tableInfo = new TableInfo(); + tableInfo.readFields(in); + updateTableByTableInfo(this, tableInfo); + } } diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java index 68aad72f65a..c90c3dc3486 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java @@ -228,7 +228,7 @@ public static List getIndexFileIdentifiersFromM List tableBlockIndexUniqueIdentifiers = new ArrayList<>(); String mergeFilePath = identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier - .getIndexFileName(); + .getMergeIndexFileName(); segmentIndexFileStore.readMergeFile(mergeFilePath); List indexFiles = segmentIndexFileStore.getCarbonMergeFileToIndexFilesMap().get(mergeFilePath); diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index 004a51e4e04..2b659e1f127 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -1601,4 +1601,56 @@ private void validateDetailQueryBatchSize() { } } } + + /** + * Check whether the Distributed Pruning is enabled by the user or not. + */ + public boolean isDistributedPruningEnabled(String dbName, String tableName) { + // Check if user has enabled/disabled the use of index server for the current session using + // the set command + String configuredValue = getSessionPropertyValue( + CarbonCommonConstants.CARBON_ENABLE_INDEX_SERVER + "." + dbName + "." + tableName); + if (configuredValue == null) { + // if not set in session properties then check carbon.properties for the same. + configuredValue = getProperty(CarbonCommonConstants.CARBON_ENABLE_INDEX_SERVER); + } + boolean isServerEnabledByUser = Boolean.parseBoolean(configuredValue); + if (isServerEnabledByUser) { + LOGGER.info("Distributed Index server is enabled for " + dbName + "." + tableName); + } + return isServerEnabledByUser; + } + + public String getIndexServerIP() { + return carbonProperties.getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_IP, ""); + } + + public int getIndexServerPort() { + String configuredPort = + carbonProperties.getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_PORT); + try { + return Integer.parseInt(configuredPort); + } catch (NumberFormatException e) { + LOGGER.error("Configured port for index server is not a valid number", e); + throw e; + } + } + + /** + * Whether fallback is disabled by the user or not. + */ + public boolean isFallBackDisabled() { + return Boolean.parseBoolean(carbonProperties + .getProperty(CarbonCommonConstants.CARBON_DISABLE_INDEX_SERVER_FALLBACK, "false")); + } + + public int getNumberOfHandlersForIndexServer() { + String configuredValue = + carbonProperties.getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_WORKER_THREADS); + if (configuredValue != null) { + return Integer.parseInt(configuredValue); + } + return CarbonCommonConstants.CARBON_INDEX_SERVER_WORKER_THREADS_DEFAULT; + } + } diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java index d9aa2145f2a..fbae502f9da 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java +++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java @@ -143,6 +143,7 @@ private boolean validateKeyValue(String key, String value) throws InvalidConfigu case ENABLE_UNSAFE_IN_QUERY_EXECUTION: case ENABLE_AUTO_LOAD_MERGE: case CARBON_PUSH_ROW_FILTERS_FOR_VECTOR: + case CARBON_ENABLE_INDEX_SERVER: isValid = CarbonUtil.validateBoolean(value); if (!isValid) { throw new InvalidConfigurationException("Invalid value " + value + " for key " + key); diff --git a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java index 406456f4d36..931b41b44c7 100644 --- a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java +++ b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java @@ -625,4 +625,8 @@ public long getLength() { } return this.location; } + + public void setLocation(String[] location) { + this.location = location; + } } diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java index 11b216e412e..03599a9b616 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java @@ -313,6 +313,8 @@ public List toDistributable(Segment segment) { filteredShards.contains(new File(shardPath).getName())) { DataMapDistributable bloomDataMapDistributable = new BloomDataMapDistributable(shardPath, filteredShards); + bloomDataMapDistributable.setSegment(segment); + bloomDataMapDistributable.setDataMapSchema(getDataMapSchema()); dataMapDistributableList.add(bloomDataMapDistributable); } } @@ -325,8 +327,8 @@ public void fireEvent(Event event) { } @Override - public void clear(Segment segment) { - Set shards = segmentMap.remove(segment.getSegmentNo()); + public void clear(String segment) { + Set shards = segmentMap.remove(segment); if (shards != null) { for (String shard : shards) { for (CarbonColumn carbonColumn : dataMapMeta.getIndexedColumns()) { @@ -341,15 +343,19 @@ public synchronized void clear() { if (segmentMap.size() > 0) { List segments = new ArrayList<>(segmentMap.keySet()); for (String segmentId : segments) { - clear(new Segment(segmentId, null, null)); + clear(segmentId); } } } @Override public void deleteDatamapData(Segment segment) throws IOException { + deleteSegmentDatamapData(segment.getSegmentNo()); + } + + @Override + public void deleteSegmentDatamapData(String segmentId) throws IOException { try { - String segmentId = segment.getSegmentNo(); String datamapPath = CarbonTablePath .getDataMapStorePath(getCarbonTable().getTablePath(), segmentId, dataMapName); if (FileFactory.isFileExist(datamapPath)) { @@ -357,9 +363,9 @@ public void deleteDatamapData(Segment segment) throws IOException { FileFactory.getFileType(datamapPath)); CarbonUtil.deleteFoldersAndFilesSilent(file); } - clear(segment); + clear(segmentId); } catch (InterruptedException ex) { - throw new IOException("Failed to delete datamap for segment_" + segment.getSegmentNo()); + throw new IOException("Failed to delete datamap for segment_" + segmentId); } } diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java index 68c3bcc06c5..0bc23f9247f 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java @@ -221,6 +221,8 @@ public List toDistributable(Segment segment) { DataMapDistributable luceneDataMapDistributable = new LuceneDataMapDistributable(tableIdentifier.getTablePath(), indexDir.getAbsolutePath()); + luceneDataMapDistributable.setSegment(segment); + luceneDataMapDistributable.setDataMapSchema(getDataMapSchema()); lstDataMapDistribute.add(luceneDataMapDistributable); } return lstDataMapDistribute; @@ -234,6 +236,8 @@ public List toDistributable(Segment segment) { DataMapDistributable luceneDataMapDistributable = new LuceneDataMapDistributable( CarbonTablePath.getSegmentPath(tableIdentifier.getTablePath(), segment.getSegmentNo()), indexDir.getAbsolutePath()); + luceneDataMapDistributable.setSegment(segment); + luceneDataMapDistributable.setDataMapSchema(getDataMapSchema()); lstDataMapDistribute.add(luceneDataMapDistributable); } return lstDataMapDistribute; @@ -244,14 +248,6 @@ public void fireEvent(Event event) { } - /** - * Clears datamap of the segment - */ - @Override - public void clear(Segment segment) { - - } - /** * Clear all datamaps from memory */ @@ -262,8 +258,11 @@ public void clear() { @Override public void deleteDatamapData(Segment segment) throws IOException { + deleteSegmentDatamapData(segment.getSegmentNo()); + } + + @Override public void deleteSegmentDatamapData(String segmentId) throws IOException { try { - String segmentId = segment.getSegmentNo(); String datamapPath = CarbonTablePath .getDataMapStorePath(tableIdentifier.getTablePath(), segmentId, dataMapName); if (FileFactory.isFileExist(datamapPath)) { diff --git a/dev/findbugs-exclude.xml b/dev/findbugs-exclude.xml index 63b6bd5cf9a..cac0eb40005 100644 --- a/dev/findbugs-exclude.xml +++ b/dev/findbugs-exclude.xml @@ -58,6 +58,10 @@ + + + +