From 201804858bf41537389008b03e7bd964cc3760fa Mon Sep 17 00:00:00 2001 From: akashrn5 Date: Fri, 11 May 2018 16:57:46 +0530 Subject: [PATCH] [CARBONDATA-2484][LUCENE]Refactor distributable code and lauch job to clear the datamap from executor(clears segmentMap and remove datamap from cache) Problem: During query, blockletDataMapFactory maintains a segmentMap which has mapping of segmentId -> list of index file, and this will be used while getting the extended blocklet by checking whether the blocklet present in the index or not. In case of Lucene, the datamap job will be launched and during pruning the segmentMap will be added in executor and this map will be cleared in driver when drop table is called, but it will not be cleared in executor. so when the query is fired after table or datamap is dropped, the lucene query fails. Solution: So when drop table or drop datamap is called a job is launched which clears the datamaps from segmentMap and cache and then clears in driver. This PR also refactors the datamap job classes and other common classes This closes #2310 --- .../core/datamap}/AbstractDataMapJob.java | 2 +- .../core/datamap/DataMapChooser.java | 19 +++ .../carbondata/core/datamap}/DataMapJob.java | 2 +- .../core/datamap/DataMapStoreManager.java | 65 +++++++- .../carbondata/core/datamap/DataMapUtil.java | 157 ++++++++++++++++++ .../datamap}/DistributableDataMapFormat.java | 44 +++-- .../carbondata/core/datamap/TableDataMap.java | 9 +- .../dev/expr/AndDataMapExprWrapper.java | 3 +- .../dev/expr/DataMapExprWrapperImpl.java | 3 +- .../indexstore/BlockletDetailsFetcher.java | 5 + .../blockletindex/BlockletDataMapFactory.java | 6 +- .../metadata/schema/table/CarbonTable.java | 5 + .../core}/util/ObjectSerializationUtil.java | 4 +- .../bloom/BloomCoarseGrainDataMapFactory.java | 8 + .../lucene/LuceneDataMapFactoryBase.java | 9 + .../hadoop/api/CarbonInputFormat.java | 59 +------ .../hadoop/api/CarbonTableOutputFormat.java | 2 +- .../hadoop/testutil/StoreCreator.java | 2 +- .../hadoop/util/CarbonInputFormatUtil.java | 22 +-- .../util/ObjectSerializationUtilTest.java | 2 +- .../hive/MapredCarbonInputFormat.java | 2 +- .../lucene/LuceneFineGrainDataMapSuite.scala | 1 - .../datamap/DataMapWriterSuite.scala | 2 +- .../testsuite/datamap/TestDataMapStatus.scala | 4 +- .../TestInsertAndOtherCommandConcurrent.scala | 4 +- .../carbondata/spark/rdd/CarbonScanRDD.scala | 3 +- .../spark/rdd/SparkDataMapJob.scala | 2 +- .../org/apache/spark/sql/CarbonEnv.scala | 14 +- .../datamap/CarbonDropDataMapCommand.scala | 8 +- .../management/CarbonLoadDataCommand.scala | 4 +- .../datasources/SparkCarbonFileFormat.scala | 2 +- .../datasources/SparkCarbonTableFormat.scala | 3 +- .../streaming/CarbonStreamOutputFormat.java | 2 +- 33 files changed, 357 insertions(+), 122 deletions(-) rename {hadoop/src/main/java/org/apache/carbondata/hadoop/api => core/src/main/java/org/apache/carbondata/core/datamap}/AbstractDataMapJob.java (97%) rename {hadoop/src/main/java/org/apache/carbondata/hadoop/api => core/src/main/java/org/apache/carbondata/core/datamap}/DataMapJob.java (97%) create mode 100644 core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java rename {hadoop/src/main/java/org/apache/carbondata/hadoop/api => core/src/main/java/org/apache/carbondata/core/datamap}/DistributableDataMapFormat.java (76%) rename {hadoop/src/main/java/org/apache/carbondata/hadoop => core/src/main/java/org/apache/carbondata/core}/util/ObjectSerializationUtil.java (97%) diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java b/core/src/main/java/org/apache/carbondata/core/datamap/AbstractDataMapJob.java similarity index 97% rename from hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java rename to core/src/main/java/org/apache/carbondata/core/datamap/AbstractDataMapJob.java index 68351846448..bdbf9fc1573 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/AbstractDataMapJob.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.carbondata.hadoop.api; +package org.apache.carbondata.core.datamap; import java.util.List; diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java index 7cdabd6f7ed..4d1c718d8be 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java @@ -130,6 +130,25 @@ private DataMapExprWrapper chooseDataMap(DataMapLevel level, FilterResolverIntf return null; } + /** + * Get all datamaps of the table for clearing purpose + */ + public DataMapExprWrapper getAllDataMapsForClear(CarbonTable carbonTable) + throws IOException { + List allDataMapFG = + DataMapStoreManager.getInstance().getAllDataMap(carbonTable); + DataMapExprWrapper initialExpr = null; + if (allDataMapFG.size() > 0) { + initialExpr = new DataMapExprWrapperImpl(allDataMapFG.get(0), null); + + for (int i = 1; i < allDataMapFG.size(); i++) { + initialExpr = new AndDataMapExprWrapper(initialExpr, + new DataMapExprWrapperImpl(allDataMapFG.get(i), null), null); + } + } + return initialExpr; + } + /** * Returns default blocklet datamap * @param carbonTable diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java similarity index 97% rename from hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java rename to core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java index c43921982cb..57a739d50ba 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.carbondata.hadoop.api; +package org.apache.carbondata.core.datamap; import java.io.Serializable; import java.util.List; diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java index 072b86eb69d..c739dc346c8 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java @@ -36,6 +36,7 @@ import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher; import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.CarbonMetadata; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; import org.apache.carbondata.core.metadata.schema.table.DataMapSchemaStorageProvider; @@ -57,6 +58,10 @@ public final class DataMapStoreManager { private static DataMapStoreManager instance = new DataMapStoreManager(); + public Map> getAllDataMaps() { + return allDataMaps; + } + /** * Contains the list of datamaps for each table. */ @@ -364,17 +369,58 @@ public void clearInvalidSegments(CarbonTable carbonTable, List segments * @param identifier Table identifier */ public void clearDataMaps(AbsoluteTableIdentifier identifier) { + CarbonTable carbonTable = getCarbonTable(identifier); String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName(); List tableIndices = allDataMaps.get(tableUniqueName); + if (null != carbonTable && tableIndices != null) { + try { + DataMapUtil.executeDataMapJobForClearingDataMaps(carbonTable); + } catch (IOException e) { + LOGGER.error(e, "clear dataMap job failed"); + // ignoring the exception + } + } segmentRefreshMap.remove(identifier.uniqueName()); + clearDataMaps(tableUniqueName); + allDataMaps.remove(tableUniqueName); + } + + /** + * This method returns the carbonTable from identifier + * @param identifier + * @return + */ + public CarbonTable getCarbonTable(AbsoluteTableIdentifier identifier) { + CarbonTable carbonTable = null; + carbonTable = CarbonMetadata.getInstance() + .getCarbonTable(identifier.getDatabaseName(), identifier.getTableName()); + if (carbonTable == null) { + try { + carbonTable = CarbonTable + .buildFromTablePath(identifier.getTableName(), identifier.getDatabaseName(), + identifier.getTablePath()); + } catch (IOException e) { + LOGGER.error("failed to get carbon table from table Path"); + // ignoring exception + } + } + return carbonTable; + } + + /** + * this methos clears the datamap of table from memory + */ + public void clearDataMaps(String tableUniqName) { + List tableIndices = allDataMaps.get(tableUniqName); if (tableIndices != null) { for (TableDataMap tableDataMap : tableIndices) { if (tableDataMap != null) { + // clear the segmentMap in BlockletDetailsFetcher,else the Segment will remain in executor + // and the query fails as we will check whether the blocklet contains in the index or not + tableDataMap.getBlockletDetailsFetcher().clear(); tableDataMap.clear(); - break; } } - allDataMaps.remove(tableUniqueName); } } @@ -384,14 +430,21 @@ public void clearDataMaps(AbsoluteTableIdentifier identifier) { * @param identifier Table identifier */ public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) { - List tableIndices = - allDataMaps.get(identifier.getCarbonTableIdentifier().getTableUniqueName()); + CarbonTable carbonTable = getCarbonTable(identifier); + String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName(); + List tableIndices = allDataMaps.get(tableUniqueName); if (tableIndices != null) { int i = 0; for (TableDataMap tableDataMap : tableIndices) { - if (tableDataMap != null && dataMapName + if (carbonTable != null && tableDataMap != null && dataMapName .equalsIgnoreCase(tableDataMap.getDataMapSchema().getDataMapName())) { - tableDataMap.clear(); + try { + DataMapUtil.executeDataMapJobForClearingDataMaps(carbonTable); + tableDataMap.clear(); + } catch (IOException e) { + LOGGER.error(e, "clear dataMap job failed"); + // ignoring the exception + } tableDataMap.deleteDatamapData(); tableIndices.remove(i); break; diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java new file mode 100644 index 00000000000..e3d3194d8de --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datamap; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.ObjectSerializationUtil; + +import org.apache.hadoop.conf.Configuration; + +public class DataMapUtil { + + private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr"; + + private static final LogService LOGGER = + LogServiceFactory.getLogService(DataMapUtil.class.getName()); + + /** + * Creates instance for the DataMap Job class + * + * @param className + * @return + */ + public static Object createDataMapJob(String className) { + try { + return Class.forName(className).getDeclaredConstructors()[0].newInstance(); + } catch (Exception e) { + LOGGER.error(e); + return null; + } + } + + /** + * This method sets the datamapJob in the configuration + * @param configuration + * @param dataMapJob + * @throws IOException + */ + public static void setDataMapJob(Configuration configuration, Object dataMapJob) + throws IOException { + if (dataMapJob != null) { + String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob); + configuration.set(DATA_MAP_DSTR, toString); + } + } + + /** + * get datamap job from the configuration + * @param configuration job configuration + * @return DataMap Job + * @throws IOException + */ + public static DataMapJob getDataMapJob(Configuration configuration) throws IOException { + String jobString = configuration.get(DATA_MAP_DSTR); + if (jobString != null) { + return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString); + } + return null; + } + + /** + * This method gets the datamapJob and call execute , this job will be launched before clearing + * datamaps from driver side during drop table and drop datamap and clears the datamap in executor + * side + * @param carbonTable + * @throws IOException + */ + public static void executeDataMapJobForClearingDataMaps(CarbonTable carbonTable) + throws IOException { + String dataMapJobClassName = "org.apache.carbondata.spark.rdd.SparkDataMapJob"; + DataMapJob dataMapJob = (DataMapJob) createDataMapJob(dataMapJobClassName); + String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat"; + SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo = + getValidAndInvalidSegments(carbonTable); + List validSegments = validAndInvalidSegmentsInfo.getValidSegments(); + List invalidSegments = validAndInvalidSegmentsInfo.getInvalidSegments(); + DataMapExprWrapper dataMapExprWrapper = null; + if (DataMapStoreManager.getInstance().getAllDataMap(carbonTable).size() > 0) { + DataMapChooser dataMapChooser = new DataMapChooser(carbonTable); + dataMapExprWrapper = dataMapChooser.getAllDataMapsForClear(carbonTable); + } else { + return; + } + DistributableDataMapFormat dataMapFormat = + createDataMapJob(carbonTable, dataMapExprWrapper, validSegments, invalidSegments, null, + className, true); + dataMapJob.execute(dataMapFormat, null); + } + + private static DistributableDataMapFormat createDataMapJob(CarbonTable carbonTable, + DataMapExprWrapper dataMapExprWrapper, List validsegments, + List invalidSegments, List partitionsToPrune, String clsName, + boolean isJobToClearDataMaps) { + try { + Constructor cons = Class.forName(clsName).getDeclaredConstructors()[0]; + return (DistributableDataMapFormat) cons + .newInstance(carbonTable, dataMapExprWrapper, validsegments, invalidSegments, + partitionsToPrune, isJobToClearDataMaps); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * this method gets the datamapJob and call execute of that job, this will be launched for + * distributed CG or FG + * @return list of Extended blocklets after pruning + */ + public static List executeDataMapJob(CarbonTable carbonTable, + FilterResolverIntf resolver, List validSegments, + DataMapExprWrapper dataMapExprWrapper, DataMapJob dataMapJob, + List partitionsToPrune) throws IOException { + String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat"; + SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo = + getValidAndInvalidSegments(carbonTable); + List invalidSegments = validAndInvalidSegmentsInfo.getInvalidSegments(); + DistributableDataMapFormat dataMapFormat = + createDataMapJob(carbonTable, dataMapExprWrapper, validSegments, invalidSegments, + partitionsToPrune, className, false); + List prunedBlocklets = dataMapJob.execute(dataMapFormat, resolver); + // Apply expression on the blocklets. + prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets); + return prunedBlocklets; + } + + private static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvalidSegments( + CarbonTable carbonTable) throws IOException { + SegmentStatusManager ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier()); + return ssm.getValidAndInvalidSegments(); + } + +} diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java similarity index 76% rename from hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java rename to core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java index 213c5a56a2f..4200414d6ec 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java @@ -14,24 +14,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.carbondata.hadoop.api; +package org.apache.carbondata.core.datamap; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; -import org.apache.carbondata.core.datamap.DataMapStoreManager; -import org.apache.carbondata.core.datamap.Segment; -import org.apache.carbondata.core.datamap.TableDataMap; import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper; import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; -import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.core.util.ObjectSerializationUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; @@ -54,18 +52,27 @@ public class DistributableDataMapFormat extends FileInputFormat validSegments; - private String className; + private List invalidSegments; private List partitions; - DistributableDataMapFormat(CarbonTable table, - DataMapExprWrapper dataMapExprWrapper, List validSegments, - List partitions, String className) { + private DataMapDistributableWrapper distributable; + + private boolean isJobToClearDataMaps = false; + + DistributableDataMapFormat(CarbonTable table, DataMapExprWrapper dataMapExprWrapper, + List validSegments, List invalidSegments, List partitions, + boolean isJobToClearDataMaps) { this.table = table; this.dataMapExprWrapper = dataMapExprWrapper; this.validSegments = validSegments; - this.className = className; + this.invalidSegments = invalidSegments; this.partitions = partitions; + this.isJobToClearDataMaps = isJobToClearDataMaps; + } + + public boolean isJobToClearDataMaps() { + return isJobToClearDataMaps; } public static void setFilterExp(Configuration configuration, FilterResolverIntf filterExp) @@ -103,10 +110,21 @@ public RecordReader createRecordReader(InputSplit inputS @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - DataMapDistributableWrapper distributable = (DataMapDistributableWrapper) inputSplit; - TableDataMap dataMap = DataMapStoreManager.getInstance() + distributable = (DataMapDistributableWrapper) inputSplit; + // clear the segmentMap and from cache in executor when there are invalid segments + if (invalidSegments.size() > 0) { + DataMapStoreManager.getInstance().clearInvalidSegments(table, invalidSegments); + } + TableDataMap tableDataMap = DataMapStoreManager.getInstance() .getDataMap(table, distributable.getDistributable().getDataMapSchema()); - List blocklets = dataMap.prune(distributable.getDistributable(), + if (isJobToClearDataMaps) { + // if job is to clear datamaps just clear datamaps from cache and return + DataMapStoreManager.getInstance() + .clearDataMaps(table.getCarbonTableIdentifier().getTableUniqueName()); + blockletIterator = Collections.emptyIterator(); + return; + } + List blocklets = tableDataMap.prune(distributable.getDistributable(), dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions); for (ExtendedBlocklet blocklet : blocklets) { blocklet.setDataMapUniqueId(distributable.getUniqueId()); diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java index 314b5158add..b8254d4d4ba 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java @@ -74,6 +74,10 @@ public final class TableDataMap extends OperationEventListener { this.segmentPropertiesFetcher = segmentPropertiesFetcher; } + public BlockletDetailsFetcher getBlockletDetailsFetcher() { + return blockletDetailsFetcher; + } + /** * Pass the valid segments and prune the datamap using filter expression * @@ -122,8 +126,9 @@ private List addSegmentId(List pruneBlocklet public List toDistributable(List segments) throws IOException { List distributables = new ArrayList<>(); for (Segment segment : segments) { - List list = dataMapFactory.toDistributable(segment); - for (DataMapDistributable distributable: list) { + List list = + dataMapFactory.toDistributable(segment); + for (DataMapDistributable distributable : list) { distributable.setDataMapSchema(dataMapSchema); distributable.setSegment(segment); distributable.setTablePath(identifier.getTablePath()); diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java index 199f9936255..1de16bcaa7f 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java @@ -87,7 +87,8 @@ public List prune(List segments, List return null; } - @Override public List toDistributable(List segments) + @Override + public List toDistributable(List segments) throws IOException { List wrappers = new ArrayList<>(); wrappers.addAll(left.toDistributable(segments)); diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java index 0a3896c0213..38f23369ed3 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java @@ -74,7 +74,8 @@ public List prune(List segments, List return null; } - @Override public List toDistributable(List segments) + @Override + public List toDistributable(List segments) throws IOException { List dataMapDistributables = dataMap.toDistributable(segments); List wrappers = new ArrayList<>(); diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java index 58c11db156e..1971f40d486 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java @@ -55,4 +55,9 @@ List getExtendedBlocklets(List blocklets, Segment se */ List getAllBlocklets(Segment segment, List partitions) throws IOException; + + /** + * clears the datamap from cache and segmentMap from executor + */ + void clear(); } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index e56c2d0c661..021fb82df4f 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -282,8 +282,10 @@ public void clear(Segment segment) { @Override public void clear() { - for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) { - clear(new Segment(segmentId, null, null)); + if (segmentMap.size() > 0) { + for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) { + clear(new Segment(segmentId, null, null)); + } } } diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index 5acca27d243..9d648f5ee03 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -231,6 +231,11 @@ public static CarbonTable buildFromTablePath(String tableName, String tablePath, } } + public static CarbonTable buildFromTablePath(String tableName, String dbName, String tablePath) + throws IOException { + return SchemaReader + .readCarbonTableFromStore(AbsoluteTableIdentifier.from(tablePath, dbName, tableName)); + } /** * @param tableInfo */ diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/ObjectSerializationUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ObjectSerializationUtil.java similarity index 97% rename from hadoop/src/main/java/org/apache/carbondata/hadoop/util/ObjectSerializationUtil.java rename to core/src/main/java/org/apache/carbondata/core/util/ObjectSerializationUtil.java index d97df2d363c..020787de681 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/ObjectSerializationUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/ObjectSerializationUtil.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.carbondata.hadoop.util; +package org.apache.carbondata.core.util; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -24,8 +24,6 @@ import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; -import org.apache.carbondata.core.util.CarbonUtil; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java index 581c3a62641..16b49f24166 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java @@ -252,6 +252,14 @@ public List toDistributable(Segment segment) { List dataMapDistributableList = new ArrayList<>(); CarbonFile[] indexDirs = getAllIndexDirs(getCarbonTable().getTablePath(), segment.getSegmentNo()); + if (segment.getFilteredIndexShardNames().size() == 0) { + for (CarbonFile indexDir : indexDirs) { + DataMapDistributable bloomDataMapDistributable = new BloomDataMapDistributable( + indexDir.getAbsolutePath()); + dataMapDistributableList.add(bloomDataMapDistributable); + } + return dataMapDistributableList; + } for (CarbonFile indexDir : indexDirs) { // Filter out the tasks which are filtered through CG datamap. if (!segment.getFilteredIndexShardNames().contains(indexDir.getName())) { diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java index 4c6aec3f034..4bcdebb9ed6 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java @@ -166,6 +166,15 @@ public List toDistributable(Segment segment) { List lstDataMapDistribute = new ArrayList<>(); CarbonFile[] indexDirs = getAllIndexDirs(tableIdentifier.getTablePath(), segment.getSegmentNo()); + if (segment.getFilteredIndexShardNames().size() == 0) { + for (CarbonFile indexDir : indexDirs) { + DataMapDistributable luceneDataMapDistributable = new LuceneDataMapDistributable( + CarbonTablePath.getSegmentPath(tableIdentifier.getTablePath(), segment.getSegmentNo()), + indexDir.getAbsolutePath()); + lstDataMapDistribute.add(luceneDataMapDistributable); + } + return lstDataMapDistribute; + } for (CarbonFile indexDir : indexDirs) { // Filter out the tasks which are filtered through CG datamap. if (!segment.getFilteredIndexShardNames().contains(indexDir.getName())) { diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index c5365d55f4c..91da93fe19c 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -28,12 +28,13 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal; import org.apache.carbondata.core.datamap.DataMapChooser; +import org.apache.carbondata.core.datamap.DataMapJob; +import org.apache.carbondata.core.datamap.DataMapUtil; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; import org.apache.carbondata.core.exception.InvalidConfigurationException; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; -import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.schema.PartitionInfo; @@ -54,6 +55,7 @@ import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeConverter; import org.apache.carbondata.core.util.DataTypeConverterImpl; +import org.apache.carbondata.core.util.ObjectSerializationUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.hadoop.CarbonInputSplit; import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; @@ -61,7 +63,6 @@ import org.apache.carbondata.hadoop.CarbonRecordReader; import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; -import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -103,7 +104,6 @@ public abstract class CarbonInputFormat extends FileInputFormat { "mapreduce.input.carboninputformat.transactional"; private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport"; private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter"; - private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr"; public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName"; public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName"; private static final String PARTITIONS_TO_PRUNE = @@ -171,22 +171,6 @@ public static void setPartitionIdList(Configuration configuration, List configuration.set(ALTER_PARTITION_ID, partitionIds.toString()); } - public static void setDataMapJob(Configuration configuration, Object dataMapJob) - throws IOException { - if (dataMapJob != null) { - String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob); - configuration.set(DATA_MAP_DSTR, toString); - } - } - - public static DataMapJob getDataMapJob(Configuration configuration) throws IOException { - String jobString = configuration.get(DATA_MAP_DSTR); - if (jobString != null) { - return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString); - } - return null; - } - /** * It sets unresolved filter expression. * @@ -416,7 +400,7 @@ private List getPrunedBlocklets(JobContext job, CarbonTable ca boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP, CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT)); - DataMapJob dataMapJob = getDataMapJob(job.getConfiguration()); + DataMapJob dataMapJob = DataMapUtil.getDataMapJob(job.getConfiguration()); List partitionsToPrune = getPartitionsToPrune(job.getConfiguration()); // First prune using default datamap on driver side. DataMapExprWrapper dataMapExprWrapper = DataMapChooser @@ -436,8 +420,8 @@ private List getPrunedBlocklets(JobContext job, CarbonTable ca pruneSegments(segmentIds, prunedBlocklets); // Again prune with CG datamap. if (distributedCG && dataMapJob != null) { - prunedBlocklets = - executeDataMapJob(carbonTable, resolver, segmentIds, cgDataMapExprWrapper, dataMapJob, + prunedBlocklets = DataMapUtil + .executeDataMapJob(carbonTable, resolver, segmentIds, cgDataMapExprWrapper, dataMapJob, partitionsToPrune); } else { prunedBlocklets = cgDataMapExprWrapper.prune(segmentIds, partitionsToPrune); @@ -452,8 +436,8 @@ private List getPrunedBlocklets(JobContext job, CarbonTable ca if (fgDataMapExprWrapper != null) { // Prune segments from already pruned blocklets pruneSegments(segmentIds, prunedBlocklets); - prunedBlocklets = - executeDataMapJob(carbonTable, resolver, segmentIds, fgDataMapExprWrapper, dataMapJob, + prunedBlocklets = DataMapUtil + .executeDataMapJob(carbonTable, resolver, segmentIds, fgDataMapExprWrapper, dataMapJob, partitionsToPrune); ExplainCollector.recordFGDataMapPruning( @@ -463,33 +447,6 @@ private List getPrunedBlocklets(JobContext job, CarbonTable ca return prunedBlocklets; } - private List executeDataMapJob(CarbonTable carbonTable, - FilterResolverIntf resolver, List segmentIds, DataMapExprWrapper dataMapExprWrapper, - DataMapJob dataMapJob, List partitionsToPrune) throws IOException { - String className = "org.apache.carbondata.hadoop.api.DistributableDataMapFormat"; - FileInputFormat dataMapFormat = - createDataMapJob(carbonTable, dataMapExprWrapper, segmentIds, partitionsToPrune, className); - List prunedBlocklets = - dataMapJob.execute((DistributableDataMapFormat) dataMapFormat, resolver); - // Apply expression on the blocklets. - prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets); - return prunedBlocklets; - } - - - public static FileInputFormat createDataMapJob(CarbonTable carbonTable, - DataMapExprWrapper dataMapExprWrapper, List segments, - List partitionsToPrune, String clsName) { - try { - Constructor cons = Class.forName(clsName).getDeclaredConstructors()[0]; - return (FileInputFormat) cons - .newInstance(carbonTable, dataMapExprWrapper, segments, partitionsToPrune, - BlockletDataMapFactory.class.getName()); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - /** * Prune the segments from the already pruned blocklets. * @param segments diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index 7050c8f9ee6..b2ff3ab507d 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -33,8 +33,8 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonThreadFactory; +import org.apache.carbondata.core.util.ObjectSerializationUtil; import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; -import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; import org.apache.carbondata.processing.loading.DataLoadExecutor; import org.apache.carbondata.processing.loading.TableProcessingOperations; import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper; diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java index 9075012ef84..9fd1812a209 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java @@ -167,7 +167,7 @@ public static void createCarbonStore() throws Exception { /** * Method to clear the data maps */ - public static void clearDataMaps() { + public static void clearDataMaps() throws IOException { DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier); } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java index 3208a28769d..af7397b0988 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java @@ -26,6 +26,8 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal; +import org.apache.carbondata.core.datamap.DataMapJob; +import org.apache.carbondata.core.datamap.DataMapUtil; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.exception.InvalidConfigurationException; import org.apache.carbondata.core.indexstore.PartitionSpec; @@ -38,7 +40,6 @@ import org.apache.carbondata.hadoop.CarbonProjection; import org.apache.carbondata.hadoop.api.CarbonInputFormat; import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; -import org.apache.carbondata.hadoop.api.DataMapJob; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -119,7 +120,7 @@ private static CarbonTableInputFormat createInputFormat( CarbonInputFormat.setFilterPredicates(conf, filterExpression); CarbonInputFormat.setColumnProjection(conf, columnProjection); if (dataMapJob != null) { - CarbonInputFormat.setDataMapJob(conf, dataMapJob); + DataMapUtil.setDataMapJob(conf, dataMapJob); } else { setDataMapJobIfConfigured(conf); } @@ -164,22 +165,7 @@ private static CarbonTableInputFormat createInputFormat( */ public static void setDataMapJobIfConfigured(Configuration conf) throws IOException { String className = "org.apache.carbondata.spark.rdd.SparkDataMapJob"; - CarbonTableInputFormat.setDataMapJob(conf, createDataMapJob(className)); - } - - /** - * Creates instance for the DataMap Job class - * - * @param className - * @return - */ - public static Object createDataMapJob(String className) { - try { - return Class.forName(className).getDeclaredConstructors()[0].newInstance(); - } catch (Exception e) { - LOGGER.error(e); - return null; - } + DataMapUtil.setDataMapJob(conf, DataMapUtil.createDataMapJob(className)); } public static String createJobTrackerID(java.util.Date date) { diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/ObjectSerializationUtilTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/ObjectSerializationUtilTest.java index 6046aca8ec2..a6ec303dcd6 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/ObjectSerializationUtilTest.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/ObjectSerializationUtilTest.java @@ -18,11 +18,11 @@ package org.apache.carbondata.hadoop.test.util; import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; import org.apache.carbondata.core.scan.expression.ColumnExpression; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.expression.LiteralExpression; import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression; +import org.apache.carbondata.core.util.ObjectSerializationUtil; import junit.framework.TestCase; import org.junit.Assert; diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java index 1cf23694a6d..89a5ed658e5 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java @@ -30,10 +30,10 @@ import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.scan.model.QueryModelBuilder; import org.apache.carbondata.core.util.DataTypeConverterImpl; +import org.apache.carbondata.core.util.ObjectSerializationUtil; import org.apache.carbondata.hadoop.CarbonInputSplit; import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; -import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.InvalidPathException; diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala index 6d2eb3f1981..89623cf90ae 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala @@ -124,7 +124,6 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { | USING 'lucene' | DMProperties('INDEX_COLUMNS'='Name , cIty') """.stripMargin) - checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n10')"), sql(s"select * from datamap_test where name='n10'")) checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('city:c020')"), sql(s"SELECT * FROM datamap_test WHERE city='c020'")) diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala index ffbcf67a903..42502695562 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala @@ -66,7 +66,7 @@ class C2DataMapFactory( * @return */ override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = { - ??? + util.Collections.emptyList() } /** diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala index cccfb3f9426..0c4f6528cbc 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala @@ -261,7 +261,9 @@ class TestDataMapFactory( override def getMeta: DataMapMeta = new DataMapMeta(carbonTable.getIndexedColumns(dataMapSchema), Seq(ExpressionType.EQUALS).asJava) - override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = ??? + override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = { + util.Collections.emptyList() + } /** * delete datamap data if any diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala index 3cabc7bce9c..1657a808f36 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala @@ -334,7 +334,9 @@ class WaitingDataMapFactory( override def getMeta: DataMapMeta = new DataMapMeta(carbonTable.getIndexedColumns(dataMapSchema), Seq(ExpressionType.EQUALS).asJava) - override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = ??? + override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = { + util.Collections.emptyList() + } /** * delete datamap data if any diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index b9a3371afbc..67ea33205e2 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -44,8 +44,7 @@ import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonCommon import org.apache.carbondata.core.datastore.block.Distributable import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} -import org.apache.carbondata.core.profiler.ExplainCollector +import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.scan.expression.Expression import org.apache.carbondata.core.scan.filter.FilterUtil import org.apache.carbondata.core.scan.model.QueryModel diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala index f51c3bc5608..6ee566c971f 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala @@ -27,9 +27,9 @@ import org.apache.hadoop.mapreduce.{InputSplit, Job, TaskAttemptID, TaskType} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark.{Partition, SparkContext, TaskContext, TaskKilledException} +import org.apache.carbondata.core.datamap.{AbstractDataMapJob, DistributableDataMapFormat} import org.apache.carbondata.core.indexstore.ExtendedBlocklet import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf -import org.apache.carbondata.hadoop.api.{AbstractDataMapJob, DistributableDataMapFormat} /** * Spark job to execute datamap job and prune all the datamaps distributable diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index 2f23d777eb0..3e0a0352f76 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql +import java.io.File import java.util.concurrent.ConcurrentHashMap import scala.util.Try @@ -26,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.execution.command.preaaggregate._ import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction -import org.apache.spark.sql.hive.{HiveSessionCatalog, _} +import org.apache.spark.sql.hive._ import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.common.logging.LogServiceFactory @@ -216,14 +217,19 @@ object CarbonEnv { var isRefreshed = false val carbonEnv = getInstance(sparkSession) val table = carbonEnv.carbonMetastore.getTableFromMetadataCache( - identifier.database.getOrElse("default"), identifier.table) + identifier.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase), + identifier.table) if (table.isEmpty || (table.isDefined && carbonEnv.carbonMetastore .checkSchemasModifiedTimeAndReloadTable(identifier))) { sparkSession.sessionState.catalog.refreshTable(identifier) + val tablePath = CarbonProperties.getStorePath + File.separator + identifier.database + .getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase) + + File.separator + identifier.table DataMapStoreManager.getInstance(). - clearDataMaps(AbsoluteTableIdentifier.from(CarbonProperties.getStorePath, - identifier.database.getOrElse("default"), identifier.table)) + clearDataMaps(AbsoluteTableIdentifier.from(tablePath, + identifier.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase), + identifier.table)) isRefreshed = true } isRefreshed diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala index a27b69423b8..f1ed5d15150 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala @@ -35,7 +35,7 @@ import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} -import org.apache.carbondata.datamap.DataMapManager +import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapProvider} import org.apache.carbondata.events._ /** @@ -198,6 +198,12 @@ case class CarbonDropDataMapCommand( dataMapProvider = DataMapManager.get.getDataMapProvider(mainTable, dataMapSchema, sparkSession) DataMapStatusManager.dropDataMap(dataMapSchema.getDataMapName) + // if it is indexDataMap provider like lucene, then call cleanData, which will launch a job + // to clear datamap from memory(clears from segmentMap and cache), This is called before + // deleting the datamap schemas from _System folder + if (dataMapProvider.isInstanceOf[IndexDataMapProvider]) { + dataMapProvider.cleanData() + } dataMapProvider.cleanMeta() } } catch { diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 3bef4b6e115..5ce510bb075 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -60,12 +60,10 @@ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} -import org.apache.carbondata.core.util.DataTypeUtil +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil, ObjectSerializationUtil} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.events.exception.PreEventException -import org.apache.carbondata.hadoop.util.ObjectSerializationUtil import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala index 35db1f52b8c..1da65078340 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala @@ -53,7 +53,7 @@ import org.apache.carbondata.core.scan.model.QueryModel import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader, InputMetricsStats} -import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat, DataMapJob} +import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat} import org.apache.carbondata.spark.util.CarbonScalaUtil @InterfaceAudience.User diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala index d6eab1d72e2..ac41d2e3bae 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala @@ -44,12 +44,11 @@ import org.apache.carbondata.core.metadata.SegmentFileStore import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} -import org.apache.carbondata.core.util.{CarbonProperties, DataTypeConverterImpl, DataTypeUtil} +import org.apache.carbondata.core.util.{CarbonProperties, DataTypeConverterImpl, DataTypeUtil, ObjectSerializationUtil} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat} import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter import org.apache.carbondata.hadoop.internal.ObjectArrayWritable -import org.apache.carbondata.hadoop.util.ObjectSerializationUtil import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption} import org.apache.carbondata.spark.util.{CarbonScalaUtil, SparkDataTypeConverterImpl, Util} diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamOutputFormat.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamOutputFormat.java index f9f0d76dcb1..0a7beb877e7 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamOutputFormat.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamOutputFormat.java @@ -21,7 +21,7 @@ import java.nio.charset.Charset; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.core.util.ObjectSerializationUtil; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.hadoop.conf.Configuration;