From 2a9604cd840dc1a552afcff23059ac9bf624e161 Mon Sep 17 00:00:00 2001 From: kunal642 Date: Wed, 8 Aug 2018 21:50:44 +0530 Subject: [PATCH] [CARBONDATA-2844] [CARBONDATA-2865] Pass SK/AK to executor by serializing hadoop configuration from driver. add SK/AK to thread local so that on each query new SK/AK can be passed to FileFactory Refactor FileFactory to accept configuration from thread local. Fixed compatibility issue from 1.3.x to 1.5.x [CARBONDATA-2865]. This closes #2623 --- .../datastore/impl/DFSFileReaderImpl.java | 8 ++- .../impl/DefaultFileTypeProvider.java | 4 +- .../core/datastore/impl/FileFactory.java | 27 ++++++--- .../datastore/impl/FileTypeInterface.java | 2 +- .../scan/executor/QueryExecutorFactory.java | 12 ++-- .../executor/impl/AbstractQueryExecutor.java | 5 +- .../executor/impl/DetailQueryExecutor.java | 6 ++ .../impl/SearchModeDetailQueryExecutor.java | 4 +- .../SearchModeVectorDetailQueryExecutor.java | 5 +- .../impl/VectorDetailQueryExecutor.java | 6 ++ .../core/util/CarbonProperties.java | 6 +- .../carbondata/core/util/CarbonUtil.java | 6 +- .../carbondata/core/util/SessionParams.java | 5 +- .../core/util/ThreadLocalSessionInfo.java | 20 +++++++ .../store/impl/DFSFileReaderImplUnitTest.java | 3 +- .../datamap/lucene/LuceneDataMapWriter.java | 2 +- .../carbondata/hadoop/CarbonRecordReader.java | 10 ++-- .../hadoop/api/CarbonInputFormat.java | 3 +- .../hadoop/api/CarbonTableInputFormat.java | 1 - .../hadoop/api/CarbonTableOutputFormat.java | 7 ++- .../hadoop/util/CarbonInputFormatUtil.java | 17 ------ .../hive/CarbonHiveRecordReader.java | 2 +- .../presto/CarbondataPageSourceProvider.java | 3 +- .../PrestoCarbonVectorizedRecordReader.java | 3 +- .../presto/impl/CarbonTableReader.java | 2 +- .../presto/server/PrestoServer.scala | 1 + ...eFineGrainDataMapWithSearchModeSuite.scala | 3 +- .../createTable/TestCreateTableAsSelect.scala | 10 ++-- .../carbondata/spark/load/CsvRDDHelper.scala | 9 ++- .../load/DataLoadProcessBuilderOnSpark.scala | 8 ++- .../load/DataLoadProcessorStepOnSpark.scala | 10 +++- .../spark/rdd/AlterTableAddColumnRDD.scala | 9 +-- .../spark/rdd/AlterTableDropColumnRDD.scala | 11 ++-- .../rdd/AlterTableLoadPartitionRDD.scala | 7 ++- .../spark/rdd/CarbonDropPartitionRDD.scala | 11 ++-- .../spark/rdd/CarbonGlobalDictionaryRDD.scala | 28 ++++++---- .../spark/rdd/CarbonIUDMergerRDD.scala | 15 ++--- .../spark/rdd/CarbonMergerRDD.scala | 15 ++--- .../carbondata/spark/rdd/CarbonRDD.scala | 55 +++++++++---------- .../spark/rdd/CarbonScanPartitionRDD.scala | 15 +++-- .../carbondata/spark/rdd/CarbonScanRDD.scala | 15 +++-- .../spark/rdd/NewCarbonDataLoadRDD.scala | 53 +++++------------- .../spark/rdd/SparkDataMapJob.scala | 21 ++++--- .../spark/rdd/StreamHandoffRDD.scala | 14 ++--- .../carbondata/spark/util/CommonUtil.scala | 10 ++-- .../spark/util/GlobalDictionaryUtil.scala | 20 +++++-- .../spark/rdd/CarbonMergeFilesRDD.scala | 11 ++-- .../spark/rdd/DataLoadCoalescedRDD.scala | 7 ++- .../command/carbonTableSchemaCommon.scala | 2 +- .../apache/spark/sql/util/SparkSQLUtil.scala | 4 ++ .../apache/spark/util/PartitionUtils.scala | 2 +- .../VectorizedCarbonRecordReader.java | 3 +- .../datasources/SparkCarbonFileFormat.scala | 3 +- .../datamap/IndexDataMapRebuildRDD.scala | 16 +++--- .../spark/rdd/CarbonDataRDDFactory.scala | 26 +++++---- .../spark/rdd/CarbonTableCompactor.scala | 4 +- .../apache/spark/sql/CarbonCountStar.scala | 6 +- .../sql/CarbonDatasourceHadoopRelation.scala | 1 - .../spark/sql/CarbonDictionaryDecoder.scala | 15 +++-- .../org/apache/spark/sql/CarbonEnv.scala | 6 +- .../org/apache/spark/sql/CarbonSession.scala | 3 +- .../org/apache/spark/sql/CarbonSource.scala | 1 - .../sql/events/MergeIndexEventListener.scala | 14 ++--- .../management/CarbonInsertIntoCommand.scala | 7 ++- .../management/CarbonLoadDataCommand.scala | 19 ++++--- .../command/mutation/DeleteExecution.scala | 16 +++--- .../mutation/HorizontalCompaction.scala | 6 ++ ...onAlterTableDropHivePartitionCommand.scala | 2 +- .../CarbonAlterTableAddColumnCommand.scala | 4 +- .../CarbonAlterTableDropColumnCommand.scala | 2 +- .../table/CarbonCreateTableCommand.scala | 6 +- .../sql/execution/strategy/DDLStrategy.scala | 4 +- .../org/apache/spark/util/TableLoader.scala | 3 +- .../merger/CarbonCompactionExecutor.java | 12 ++-- .../spliter/AbstractCarbonQueryExecutor.java | 7 ++- .../spliter/CarbonSplitExecutor.java | 6 +- .../sdk/file/CarbonReaderBuilder.java | 3 + .../sdk/file/CarbonWriterBuilder.java | 6 ++ .../carbondata/sdk/file/JsonCarbonWriter.java | 3 +- .../store/worker/SearchRequestHandler.java | 3 +- 80 files changed, 428 insertions(+), 314 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java index 1a0cd4179cc..e86fa123c75 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java @@ -25,6 +25,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.FileReader; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -37,7 +38,10 @@ public class DFSFileReaderImpl implements FileReader { private boolean readPageByPage; - public DFSFileReaderImpl() { + private Configuration configuration; + + public DFSFileReaderImpl(Configuration configuration) { + this.configuration = configuration; this.fileNameAndStreamCache = new HashMap(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); } @@ -60,7 +64,7 @@ private FSDataInputStream updateCache(String filePath) throws IOException { FSDataInputStream fileChannel = fileNameAndStreamCache.get(filePath); if (null == fileChannel) { Path pt = new Path(filePath); - FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration()); + FileSystem fs = pt.getFileSystem(configuration); fileChannel = fs.open(pt); fileNameAndStreamCache.put(filePath, fileChannel); } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java index c4761c977bf..937b5b66847 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java @@ -29,7 +29,7 @@ public class DefaultFileTypeProvider implements FileTypeInterface { - public FileReader getFileHolder(FileFactory.FileType fileType) { + public FileReader getFileHolder(FileFactory.FileType fileType, Configuration configuration) { switch (fileType) { case LOCAL: return new FileReaderImpl(); @@ -37,7 +37,7 @@ public FileReader getFileHolder(FileFactory.FileType fileType) { case ALLUXIO: case VIEWFS: case S3: - return new DFSFileReaderImpl(); + return new DFSFileReaderImpl(configuration); default: return new FileReaderImpl(); } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java index e3536236e33..b07d11bac2d 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java @@ -30,6 +30,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.FileReader; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.util.ThreadLocalSessionInfo; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; @@ -59,11 +60,23 @@ private FileFactory() { } public static Configuration getConfiguration() { - return configuration; + Configuration conf; + Object confObject = ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo() + .getNonSerializableExtraInfo().get("carbonConf"); + if (confObject == null) { + conf = configuration; + } else { + conf = (Configuration) confObject; + } + return conf; } public static FileReader getFileHolder(FileType fileType) { - return fileFileTypeInterface.getFileHolder(fileType); + return fileFileTypeInterface.getFileHolder(fileType, getConfiguration()); + } + + public static FileReader getFileHolder(FileType fileType, Configuration configuration) { + return fileFileTypeInterface.getFileHolder(fileType, configuration); } public static FileType getFileType(String path) { @@ -100,7 +113,7 @@ public static DataInputStream getDataInputStream(String path, FileType fileType) public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize) throws IOException { - return getDataInputStream(path, fileType, bufferSize, configuration); + return getDataInputStream(path, fileType, bufferSize, getConfiguration()); } public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize, Configuration configuration) throws IOException { @@ -306,7 +319,7 @@ public static void truncateFile(String path, FileType fileType, long newSize) th // this method was new in hadoop 2.7, otherwise use CarbonFile.truncate to do this. try { Path pt = new Path(path); - FileSystem fs = pt.getFileSystem(configuration); + FileSystem fs = pt.getFileSystem(getConfiguration()); Method truncateMethod = fs.getClass().getDeclaredMethod("truncate", new Class[]{Path.class, long.class}); truncateMethod.invoke(fs, new Object[]{pt, newSize}); @@ -414,7 +427,7 @@ public static long getDirectorySize(String filePath) throws IOException { case VIEWFS: case S3: Path path = new Path(filePath); - FileSystem fs = path.getFileSystem(configuration); + FileSystem fs = path.getFileSystem(getConfiguration()); return fs.getContentSummary(path).getLength(); case LOCAL: default: @@ -442,7 +455,7 @@ public static Path getPath(String filePath) { * @throws IOException */ public static FileSystem getFileSystem(Path path) throws IOException { - return path.getFileSystem(configuration); + return path.getFileSystem(getConfiguration()); } @@ -455,7 +468,7 @@ public static void createDirectoryAndSetPermission(String directoryPath, FsPermi case VIEWFS: try { Path path = new Path(directoryPath); - FileSystem fs = path.getFileSystem(FileFactory.configuration); + FileSystem fs = path.getFileSystem(getConfiguration()); if (!fs.exists(path)) { fs.mkdirs(path); fs.setPermission(path, permission); diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java index 358d2efa664..8b0fcc45056 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java @@ -24,7 +24,7 @@ public interface FileTypeInterface { - FileReader getFileHolder(FileFactory.FileType fileType); + FileReader getFileHolder(FileFactory.FileType fileType, Configuration configuration); CarbonFile getCarbonFile(String path, FileFactory.FileType fileType); CarbonFile getCarbonFile(String path, FileFactory.FileType fileType, Configuration configuration); } diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java index b790f1ca185..2a9c7f4f80f 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java @@ -23,24 +23,26 @@ import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.hadoop.conf.Configuration; + /** * Factory class to get the query executor from RDD * This will return the executor based on query type */ public class QueryExecutorFactory { - public static QueryExecutor getQueryExecutor(QueryModel queryModel) { + public static QueryExecutor getQueryExecutor(QueryModel queryModel, Configuration configuration) { if (CarbonProperties.isSearchModeEnabled()) { if (queryModel.isVectorReader()) { - return new SearchModeVectorDetailQueryExecutor(); + return new SearchModeVectorDetailQueryExecutor(configuration); } else { - return new SearchModeDetailQueryExecutor(); + return new SearchModeDetailQueryExecutor(configuration); } } else { if (queryModel.isVectorReader()) { - return new VectorDetailQueryExecutor(); + return new VectorDetailQueryExecutor(configuration); } else { - return new DetailQueryExecutor(); + return new DetailQueryExecutor(configuration); } } } diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java index 259889b634d..ece2f8da686 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java @@ -68,10 +68,12 @@ import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.ThreadLocalSessionInfo; import org.apache.carbondata.core.util.ThreadLocalTaskInfo; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.conf.Configuration; /** * This class provides a skeletal implementation of the {@link QueryExecutor} @@ -96,7 +98,8 @@ public abstract class AbstractQueryExecutor implements QueryExecutor { */ protected CarbonIterator queryIterator; - public AbstractQueryExecutor() { + public AbstractQueryExecutor(Configuration configuration) { + ThreadLocalSessionInfo.setConfigurationToCurrentThread(configuration); queryProperties = new QueryExecutorProperties(); } diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java index 46ef43df424..e11c5767244 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java @@ -27,6 +27,8 @@ import org.apache.carbondata.core.scan.result.RowBatch; import org.apache.carbondata.core.scan.result.iterator.DetailQueryResultIterator; +import org.apache.hadoop.conf.Configuration; + /** * Below class will be used to execute the detail query * For executing the detail query it will pass all the block execution @@ -34,6 +36,10 @@ */ public class DetailQueryExecutor extends AbstractQueryExecutor { + public DetailQueryExecutor(Configuration configuration) { + super(configuration); + } + @Override public CarbonIterator execute(QueryModel queryModel) throws QueryExecutionException, IOException { diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java index ae14327c275..6d035402941 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java @@ -31,13 +31,15 @@ import org.apache.carbondata.core.scan.result.iterator.SearchModeResultIterator; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.hadoop.conf.Configuration; public class SearchModeDetailQueryExecutor extends AbstractQueryExecutor { private static final LogService LOGGER = LogServiceFactory.getLogService(SearchModeDetailQueryExecutor.class.getName()); private static ExecutorService executorService = null; - public SearchModeDetailQueryExecutor() { + public SearchModeDetailQueryExecutor(Configuration configuration) { + super(configuration); if (executorService == null) { initThreadPool(); } diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java index 705c451362e..418ef4275c3 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java @@ -32,6 +32,8 @@ import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD; +import org.apache.hadoop.conf.Configuration; + /** * Below class will be used to execute the detail query and returns columnar vectors. */ @@ -40,7 +42,8 @@ public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor { + public VectorDetailQueryExecutor(Configuration configuration) { + super(configuration); + } + @Override public CarbonIterator execute(QueryModel queryModel) throws QueryExecutionException, IOException { diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index c3a4934fbcf..58fef1796e3 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -23,11 +23,11 @@ import java.io.IOException; import java.lang.reflect.Field; import java.text.SimpleDateFormat; -import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; @@ -94,7 +94,7 @@ public final class CarbonProperties { /** * It is purely for testing */ - private Map addedProperty = new HashMap<>(); + private Map addedProperty = new ConcurrentHashMap<>(); /** * Private constructor this will call load properties method to load all the @@ -407,7 +407,7 @@ private void validateLockType() { * @param lockTypeConfigured */ private void validateAndConfigureLockType(String lockTypeConfigured) { - Configuration configuration = new Configuration(true); + Configuration configuration = FileFactory.getConfiguration(); String defaultFs = configuration.get("fs.defaultFS"); if (null != defaultFs && (defaultFs.startsWith(CarbonCommonConstants.HDFSURL_PREFIX) || defaultFs.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) || defaultFs diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 9aaa58c7841..c5e2e8d9e7c 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -134,8 +134,6 @@ public final class CarbonUtil { */ private static final int CONST_HUNDRED = 100; - private static final Configuration conf = new Configuration(true); - /** * dfs.bytes-per-checksum * HDFS checksum length, block size for a file should be exactly divisible @@ -662,7 +660,7 @@ public static String delimiterConverter(String delimiter) { */ public static String checkAndAppendHDFSUrl(String filePath) { String currentPath = filePath; - String defaultFsUrl = conf.get(CarbonCommonConstants.FS_DEFAULT_FS); + String defaultFsUrl = FileFactory.getConfiguration().get(CarbonCommonConstants.FS_DEFAULT_FS); String baseDFSUrl = CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.CARBON_DDL_BASE_HDFS_URL, ""); if (checkIfPrefixExists(filePath)) { @@ -699,7 +697,7 @@ public static String checkAndAppendFileSystemURIScheme(String filePath) { filePath = "/" + filePath; } currentPath = filePath; - String defaultFsUrl = conf.get(CarbonCommonConstants.FS_DEFAULT_FS); + String defaultFsUrl = FileFactory.getConfiguration().get(CarbonCommonConstants.FS_DEFAULT_FS); if (defaultFsUrl == null) { return currentPath; } diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java index 169c0033a05..51b157f8ec2 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java +++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.carbondata.common.constants.LoggerAction; import org.apache.carbondata.common.logging.LogService; @@ -57,12 +58,12 @@ public class SessionParams implements Serializable, Cloneable { private static final long serialVersionUID = -7801994600594915264L; private Map sProps; - private Map addedProps; + private ConcurrentHashMap addedProps; // below field to be used when we want the objects to be serialized private Map extraInfo; public SessionParams() { sProps = new HashMap<>(); - addedProps = new HashMap<>(); + addedProps = new ConcurrentHashMap<>(); extraInfo = new HashMap<>(); } diff --git a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java index df525bc24f5..f85a350e7d7 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java @@ -17,6 +17,8 @@ package org.apache.carbondata.core.util; +import org.apache.hadoop.conf.Configuration; + /** * This class maintains ThreadLocal session params */ @@ -31,4 +33,22 @@ public static void setCarbonSessionInfo(CarbonSessionInfo carbonSessionInfo) { public static CarbonSessionInfo getCarbonSessionInfo() { return threadLocal.get(); } + + public static synchronized CarbonSessionInfo getOrCreateCarbonSessionInfo() { + CarbonSessionInfo info = threadLocal.get(); + if (info == null || info.getSessionParams() == null) { + info = new CarbonSessionInfo(); + info.setSessionParams(new SessionParams()); + threadLocal.set(info); + } + return info; + } + + public static void setConfigurationToCurrentThread(Configuration configuration) { + getOrCreateCarbonSessionInfo().getNonSerializableExtraInfo().put("carbonConf", configuration); + } + + public static void unsetAll() { + threadLocal.remove(); + } } diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java index 30144c13c79..5033713e915 100644 --- a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java +++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java @@ -26,6 +26,7 @@ import mockit.Mock; import mockit.MockUp; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -45,7 +46,7 @@ public class DFSFileReaderImplUnitTest { private static File fileWithEmptyContent; @BeforeClass public static void setup() { - dfsFileHolder = new DFSFileReaderImpl(); + dfsFileHolder = new DFSFileReaderImpl(new Configuration()); file = new File("Test.carbondata"); fileWithEmptyContent = new File("TestEXception.carbondata"); diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java index 605ec89803e..bdb17ed5085 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java @@ -165,7 +165,7 @@ public void onBlockletStart(int blockletId) throws IOException { // the indexWriter closes the FileSystem on closing the writer, so for a new configuration // and disable the cache for the index writer, it will be closed on closing the writer - Configuration conf = new Configuration(); + Configuration conf = FileFactory.getConfiguration(); conf.set("fs.hdfs.impl.disable.cache", "true"); // create a index writer diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java index a54e7a46d70..0d38906236c 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java @@ -33,6 +33,7 @@ import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -57,15 +58,16 @@ public class CarbonRecordReader extends AbstractRecordReader { private boolean skipClearDataMapAtClose = false; public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport readSupport, - InputMetricsStats inputMetricsStats) { - this(queryModel, readSupport); + InputMetricsStats inputMetricsStats, Configuration configuration) { + this(queryModel, readSupport, configuration); this.inputMetricsStats = inputMetricsStats; } - public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport readSupport) { + public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport readSupport, + Configuration configuration) { this.queryModel = queryModel; this.readSupport = readSupport; - this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); + this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel, configuration); } @Override diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index 21ef6cfd06f..3ebd6d66c14 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -583,7 +583,8 @@ private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) th Configuration configuration = taskAttemptContext.getConfiguration(); QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext); CarbonReadSupport readSupport = getReadSupportClass(configuration); - return new CarbonRecordReader(queryModel, readSupport); + return new CarbonRecordReader(queryModel, readSupport, + taskAttemptContext.getConfiguration()); } public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index af2cf83bca0..ec201b9436a 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -132,7 +132,6 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO @Override public List getSplits(JobContext job) throws IOException { AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration()); - CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration()); if (null == carbonTable) { throw new IOException("Missing/Corrupt schema file for table."); diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index 5938c200ecd..5cc275b88b4 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -33,6 +33,7 @@ import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonThreadFactory; import org.apache.carbondata.core.util.ObjectSerializationUtil; +import org.apache.carbondata.core.util.ThreadLocalSessionInfo; import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; import org.apache.carbondata.processing.loading.DataLoadExecutor; import org.apache.carbondata.processing.loading.TableProcessingOperations; @@ -231,7 +232,7 @@ public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext contex @Override public RecordWriter getRecordWriter( - TaskAttemptContext taskAttemptContext) throws IOException { + final TaskAttemptContext taskAttemptContext) throws IOException { final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration()); //if loadModel having taskNo already(like in SDK) then no need to overwrite short sdkUserCore = loadModel.getSdkUserCores(); @@ -249,10 +250,12 @@ public RecordWriter getRecordWriter( final String[] tempStoreLocations = getTempStoreLocations(taskAttemptContext); final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor(); final ExecutorService executorService = Executors.newFixedThreadPool(1, - new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName()));; + new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName())); // It should be started in new thread as the underlying iterator uses blocking queue. Future future = executorService.submit(new Thread() { @Override public void run() { + ThreadLocalSessionInfo.setConfigurationToCurrentThread(taskAttemptContext + .getConfiguration()); try { dataLoadExecutor .execute(loadModel, tempStoreLocations, iterators); diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java index af7397b0988..76414277a84 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java @@ -28,7 +28,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal; import org.apache.carbondata.core.datamap.DataMapJob; import org.apache.carbondata.core.datamap.DataMapUtil; -import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.exception.InvalidConfigurationException; import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; @@ -177,20 +176,4 @@ public static JobID getJobId(java.util.Date date, int batch) { return new JobID(jobtrackerID, batch); } - public static void setS3Configurations(Configuration hadoopConf) { - FileFactory.getConfiguration() - .set("fs.s3a.access.key", hadoopConf.get("fs.s3a.access.key", "")); - FileFactory.getConfiguration() - .set("fs.s3a.secret.key", hadoopConf.get("fs.s3a.secret.key", "")); - FileFactory.getConfiguration() - .set("fs.s3a.endpoint", hadoopConf.get("fs.s3a.endpoint", "")); - FileFactory.getConfiguration().set(CarbonCommonConstants.S3_ACCESS_KEY, - hadoopConf.get(CarbonCommonConstants.S3_ACCESS_KEY, "")); - FileFactory.getConfiguration().set(CarbonCommonConstants.S3_SECRET_KEY, - hadoopConf.get(CarbonCommonConstants.S3_SECRET_KEY, "")); - FileFactory.getConfiguration().set(CarbonCommonConstants.S3N_ACCESS_KEY, - hadoopConf.get(CarbonCommonConstants.S3N_ACCESS_KEY, "")); - FileFactory.getConfiguration().set(CarbonCommonConstants.S3N_SECRET_KEY, - hadoopConf.get(CarbonCommonConstants.S3N_SECRET_KEY, "")); - } } diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java index 57bcca3f823..4ed2b912348 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java @@ -63,7 +63,7 @@ class CarbonHiveRecordReader extends CarbonRecordReader public CarbonHiveRecordReader(QueryModel queryModel, CarbonReadSupport readSupport, InputSplit inputSplit, JobConf jobConf) throws IOException { - super(queryModel, readSupport); + super(queryModel, readSupport, jobConf); initialize(inputSplit, jobConf); } diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java index 5b15b221d5b..3ec815d126b 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java @@ -100,7 +100,8 @@ private PrestoCarbonVectorizedRecordReader createReader(ConnectorSplit split, checkArgument(carbondataSplit.getConnectorId().equals(connectorId), "split is not for this connector"); QueryModel queryModel = createQueryModel(carbondataSplit, columns); - QueryExecutor queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); + QueryExecutor queryExecutor = + QueryExecutorFactory.getQueryExecutor(queryModel, new Configuration()); try { CarbonIterator iterator = queryExecutor.execute(queryModel); readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable()); diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java index 32e163a6a84..9935b5449bd 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java @@ -114,7 +114,8 @@ public PrestoCarbonVectorizedRecordReader(QueryExecutor queryExecutor, QueryMode queryModel.setTableBlockInfos(tableBlockInfoList); queryModel.setVectorReader(true); try { - queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); + queryExecutor = + QueryExecutorFactory.getQueryExecutor(queryModel, taskAttemptContext.getConfiguration()); iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel); } catch (QueryExecutionException e) { throw new InterruptedException(e.getMessage()); diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java index 7916932880c..5a1e1403c18 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java @@ -414,7 +414,7 @@ public List getInputSplits2(CarbonTableCacheModel ta List multiBlockSplitList = new ArrayList<>(); CarbonTable carbonTable = tableCacheModel.carbonTable; TableInfo tableInfo = tableCacheModel.carbonTable.getTableInfo(); - Configuration config = new Configuration(); + Configuration config = FileFactory.getConfiguration(); config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, ""); String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath(); config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath); diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala index 127c4c93136..2f3b8f4f0e4 100644 --- a/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala +++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala @@ -32,6 +32,7 @@ import com.facebook.presto.tests.DistributedQueryRunner import com.google.common.collect.ImmutableMap import org.slf4j.{Logger, LoggerFactory} +import org.apache.carbondata.core.util.ThreadLocalSessionInfo import org.apache.carbondata.presto.CarbondataPlugin object PrestoServer { diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala index 0ac6e72f48c..6cbe747f9e6 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala @@ -41,8 +41,7 @@ class LuceneFineGrainDataMapWithSearchModeSuite extends QueryTest with BeforeAnd val n = 500000 sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode() CarbonProperties - .getInstance() - .addProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT, "100s") + .getInstance().addProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT, "100s") LuceneFineGrainDataMapSuite.createFile(file2, n) sql("create database if not exists lucene") sql("use lucene") diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala index 062e5ba5d13..c95e5a490d0 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala @@ -146,13 +146,13 @@ class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll { } test("test create table as select with TBLPROPERTIES") { - sql("DROP TABLE IF EXISTS ctas_tblproperties_test") + sql("DROP TABLE IF EXISTS ctas_tblproperties_testt") sql( - "create table ctas_tblproperties_test stored by 'carbondata' TBLPROPERTIES" + + "create table ctas_tblproperties_testt stored by 'carbondata' TBLPROPERTIES" + "('DICTIONARY_INCLUDE'='key', 'sort_scope'='global_sort') as select * from carbon_ctas_test") - checkAnswer(sql("select * from ctas_tblproperties_test"), sql("select * from carbon_ctas_test")) + checkAnswer(sql("select * from ctas_tblproperties_testt"), sql("select * from carbon_ctas_test")) val carbonTable = CarbonEnv.getInstance(Spark2TestQueryExecutor.spark).carbonMetastore - .lookupRelation(Option("default"), "ctas_tblproperties_test")(Spark2TestQueryExecutor.spark) + .lookupRelation(Option("default"), "ctas_tblproperties_testt")(Spark2TestQueryExecutor.spark) .asInstanceOf[CarbonRelation].carbonTable val metadataFolderPath: CarbonFile = FileFactory.getCarbonFile(carbonTable.getMetadataPath) assert(metadataFolderPath.exists()) @@ -419,7 +419,7 @@ class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS ctas_select_where_carbon") sql("DROP TABLE IF EXISTS ctas_select_where_parquet") sql("DROP TABLE IF EXISTS ctas_select_where_orc") - sql("DROP TABLE IF EXISTS ctas_tblproperties_test") + sql("DROP TABLE IF EXISTS ctas_tblproperties_testt") sql("DROP TABLE IF EXISTS ctas_if_table_name") sql("DROP TABLE IF EXISTS source_table") sql("DROP TABLE IF EXISTS target_table") diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala index 36d8c511a6f..8d6dd32ddcc 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala @@ -37,6 +37,8 @@ import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, P import org.apache.spark.sql.util.SparkSQLUtil.sessionState import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.ThreadLocalSessionInfo import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.spark.rdd.SerializableConfiguration @@ -108,17 +110,18 @@ object CsvRDDHelper { closePartition() // 2. read function - val serializableConfiguration = new SerializableConfiguration(jobConf) + val serializableConfiguration = new SerializableConfiguration(hadoopConf) val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable { override def apply(file: PartitionedFile): Iterator[InternalRow] = { new Iterator[InternalRow] { - val hadoopConf = serializableConfiguration.value val jobTrackerId: String = { val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US) formatter.format(new Date()) } + ThreadLocalSessionInfo.setConfigurationToCurrentThread(serializableConfiguration.value) val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0) - val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId) + val hadoopAttemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, + attemptId) val inputSplit = new FileSplit(new Path(file.filePath), file.start, file.length, file.locations) var finished = false diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index dc238fb383c..be40b13ad69 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -29,11 +29,13 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.row.CarbonRow import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus} -import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, FailureCauses} import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters} import org.apache.carbondata.processing.util.CarbonDataProcessorUtil +import org.apache.carbondata.spark.rdd.SerializableConfiguration /** * Use sortBy operator in spark to load the data @@ -64,6 +66,7 @@ object DataLoadProcessBuilderOnSpark { val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator") val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator") + val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) // 1. Input val inputRDD = originRDD .mapPartitions(rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast)) @@ -73,6 +76,7 @@ object DataLoadProcessBuilderOnSpark { // 2. Convert val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows) => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) DataLoadProcessorStepOnSpark.convertFunc(rows, index, modelBroadcast, partialSuccessAccum, convertStepRowCounter) }.filter(_ != null)// Filter the bad record @@ -116,7 +120,7 @@ object DataLoadProcessBuilderOnSpark { // 4. Write sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, modelBroadcast, - writeStepRowCounter)) + writeStepRowCounter, conf)) // clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will // not have any functional impact as spark automatically monitors the cache usage on each node diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala index 73ed7694b29..f17bd9156d6 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala @@ -20,6 +20,7 @@ package org.apache.carbondata.spark.load import scala.util.Random import com.univocity.parsers.common.TextParsingException +import org.apache.hadoop.conf.Configuration import org.apache.spark.{Accumulator, SparkEnv, TaskContext} import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.Row @@ -29,7 +30,8 @@ import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException import org.apache.carbondata.core.datastore.row.CarbonRow -import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.loading.{BadRecordsLogger, BadRecordsLoggerProvider, CarbonDataLoadConfiguration, DataLoadProcessBuilder, TableProcessingOperations} import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException @@ -40,7 +42,7 @@ import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImp import org.apache.carbondata.processing.sort.sortdata.SortParameters import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory} import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil} -import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow} +import org.apache.carbondata.spark.rdd.{NewRddIterator, SerializableConfiguration, StringArrayRow} import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util} object DataLoadProcessorStepOnSpark { @@ -227,7 +229,9 @@ object DataLoadProcessorStepOnSpark { rows: Iterator[CarbonRow], index: Int, modelBroadcast: Broadcast[CarbonLoadModel], - rowCounter: Accumulator[Int]) { + rowCounter: Accumulator[Int], + conf: Broadcast[SerializableConfiguration]) { + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) var model: CarbonLoadModel = null var tableName: String = null var rowConverter: RowConverterImpl = null diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala index 7c1edea40c8..f7aa62350cf 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala @@ -17,7 +17,8 @@ package org.apache.carbondata.spark.rdd -import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.sql.SparkSession import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory @@ -47,15 +48,15 @@ class AddColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Par /** * This class is aimed at generating dictionary file for the newly added columns */ -class AlterTableAddColumnRDD[K, V](sc: SparkContext, +class AlterTableAddColumnRDD[K, V](@transient sparkSession: SparkSession, @transient newColumns: Seq[ColumnSchema], identifier: AbsoluteTableIdentifier) - extends CarbonRDD[(Int, SegmentStatus)](sc, Nil, sc.hadoopConfiguration) { + extends CarbonRDD[(Int, SegmentStatus)](sparkSession, Nil) { val lockType: String = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS) - override def getPartitions: Array[Partition] = { + override def internalGetPartitions: Array[Partition] = { newColumns.zipWithIndex.map { column => new AddColumnPartition(id, column._2, column._1) }.toArray diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala index 0dbb4f01e07..a0d06b87af4 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala @@ -17,11 +17,12 @@ package org.apache.carbondata.spark.rdd -import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.statusmanager.SegmentStatus @@ -44,12 +45,12 @@ class DropColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Pa /** * This class is aimed at generating dictionary file for the newly added columns */ -class AlterTableDropColumnRDD[K, V](sc: SparkContext, +class AlterTableDropColumnRDD[K, V](@transient ss: SparkSession, @transient newColumns: Seq[ColumnSchema], carbonTableIdentifier: AbsoluteTableIdentifier) - extends CarbonRDD[(Int, SegmentStatus)](sc, Nil, sc.hadoopConfiguration) { + extends CarbonRDD[(Int, SegmentStatus)](ss, Nil) { - override def getPartitions: Array[Partition] = { + override def internalGetPartitions: Array[Partition] = { newColumns.zipWithIndex.map { column => new DropColumnPartition(id, column._2, column._1) }.toArray diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala index 85a6f41d3e3..86a5043d743 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala @@ -39,7 +39,8 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel, partitionIds: Seq[String], bucketId: Int, identifier: AbsoluteTableIdentifier, - prev: RDD[Array[AnyRef]]) extends RDD[(K, V)](prev) { + prev: RDD[Array[AnyRef]]) + extends CarbonRDD[(K, V)](alterPartitionModel.sqlContext.sparkSession, prev) { var storeLocation: String = null val carbonLoadModel = alterPartitionModel.carbonLoadModel @@ -50,14 +51,14 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel, val factTableName = carbonTable.getTableName val partitionInfo = carbonTable.getPartitionInfo(factTableName) - override protected def getPartitions: Array[Partition] = { + override protected def internalGetPartitions: Array[Partition] = { val sc = alterPartitionModel.sqlContext.sparkContext sc.setLocalProperty("spark.scheduler.pool", "DDL") sc.setLocalProperty("spark.job.interruptOnCancel", "true") firstParent[Array[AnyRef]].partitions } - override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = { + override def internalCompute(split: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val rows = firstParent[Array[AnyRef]].iterator(split, context).toList.asJava val iter = new Iterator[(K, V)] { diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala index d56e1c2e160..e2d1effce61 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala @@ -21,7 +21,8 @@ import java.util import scala.collection.JavaConverters._ -import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.sql.SparkSession import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.indexstore.PartitionSpec @@ -37,19 +38,19 @@ case class CarbonDropPartition(rddId: Int, val idx: Int, segment: Segment) /** * RDD to drop the partitions from segment files of all segments. - * @param sc + * @param ss * @param tablePath * @param segments segments to be cleaned */ class CarbonDropPartitionRDD( - sc: SparkContext, + @transient ss: SparkSession, tablePath: String, segments: Seq[Segment], partitions: util.List[PartitionSpec], uniqueId: String) - extends CarbonRDD[(String, String)](sc, Nil, sc.hadoopConfiguration) { + extends CarbonRDD[(String, String)](ss, Nil) { - override def getPartitions: Array[Partition] = { + override def internalGetPartitions: Array[Partition] = { segments.zipWithIndex.map {s => CarbonDropPartition(id, s._2, s._1) }.toArray diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala index 2ec8b9ccde3..9265c7ff3fa 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala @@ -31,7 +31,7 @@ import com.univocity.parsers.common.TextParsingException import org.apache.commons.lang3.{ArrayUtils, StringUtils} import org.apache.spark._ import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Row +import org.apache.spark.sql.{Row, SparkSession} import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier} @@ -174,11 +174,12 @@ case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends S * @param model a model package load info */ class CarbonAllDictionaryCombineRDD( + @transient sparkSession: SparkSession, prev: RDD[(String, Iterable[String])], model: DictionaryLoadModel) - extends CarbonRDD[(Int, ColumnDistinctValues)](prev) { + extends CarbonRDD[(Int, ColumnDistinctValues)](sparkSession, prev) { - override def getPartitions: Array[Partition] = { + override def internalGetPartitions: Array[Partition] = { firstParent[(String, Iterable[String])].partitions } @@ -267,11 +268,12 @@ class StringArrayRow(var values: Array[String]) extends Row { * @param model a model package load info */ class CarbonBlockDistinctValuesCombineRDD( + @transient ss: SparkSession, prev: RDD[Row], model: DictionaryLoadModel) - extends CarbonRDD[(Int, ColumnDistinctValues)](prev) { + extends CarbonRDD[(Int, ColumnDistinctValues)](ss, prev) { - override def getPartitions: Array[Partition] = firstParent[Row].partitions + override def internalGetPartitions: Array[Partition] = firstParent[Row].partitions override def internalCompute(split: Partition, context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) @@ -325,11 +327,14 @@ class CarbonBlockDistinctValuesCombineRDD( * @param model a model package load info */ class CarbonGlobalDictionaryGenerateRDD( + @transient sparkSession: SparkSession, prev: RDD[(Int, ColumnDistinctValues)], model: DictionaryLoadModel) - extends CarbonRDD[(Int, SegmentStatus)](prev) { + extends CarbonRDD[(Int, SegmentStatus)](sparkSession, prev) { + + override def internalGetPartitions: Array[Partition] = + firstParent[(Int, ColumnDistinctValues)].partitions - override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions override def internalCompute(split: Partition, context: TaskContext): Iterator[(Int, SegmentStatus)] = { @@ -492,21 +497,20 @@ class CarbonColumnDictPatition(id: Int, dimension: CarbonDimension) * Use external column dict to generate global dictionary * * @param carbonLoadModel carbon load model - * @param sparkContext spark context + * @param sparkSession spark context * @param table carbon table identifier * @param dimensions carbon dimenisons having predefined dict * @param dictFolderPath path of dictionary folder */ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel, dictionaryLoadModel: DictionaryLoadModel, - sparkContext: SparkContext, + @transient ss: SparkSession, table: CarbonTableIdentifier, dimensions: Array[CarbonDimension], dictFolderPath: String) - extends CarbonRDD[(Int, ColumnDistinctValues)](sparkContext, Nil, - sparkContext.hadoopConfiguration) { + extends CarbonRDD[(Int, ColumnDistinctValues)](ss, Nil) { - override def getPartitions: Array[Partition] = { + override def internalGetPartitions: Array[Partition] = { val primDimensions = dictionaryLoadModel.primDimensions val primDimLength = primDimensions.length val result = new Array[Partition](primDimLength) diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala index 3aaf0aecd0a..762b920ab3f 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala @@ -24,14 +24,15 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job -import org.apache.spark.{Partition, SparkContext} +import org.apache.spark.Partition import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.command.CarbonMergerMapping -import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit} -import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} +import org.apache.carbondata.hadoop.api.CarbonInputFormat import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger.CarbonDataMergerUtil @@ -41,23 +42,23 @@ import org.apache.carbondata.spark.MergeResult * IUD carbon merger RDD * */ class CarbonIUDMergerRDD[K, V]( - sc: SparkContext, + @transient ss: SparkSession, result: MergeResult[K, V], carbonLoadModel: CarbonLoadModel, carbonMergerMapping: CarbonMergerMapping, confExecutorsTemp: String) - extends CarbonMergerRDD[K, V](sc, + extends CarbonMergerRDD[K, V](ss, result, carbonLoadModel, carbonMergerMapping, confExecutorsTemp) { - override def getPartitions: Array[Partition] = { + override def internalGetPartitions: Array[Partition] = { val startTime = System.currentTimeMillis() val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from( tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId) ) - val jobConf: JobConf = new JobConf(new Configuration) + val jobConf: JobConf = new JobConf(FileFactory.getConfiguration) SparkHadoopUtil.get.addCredentials(jobConf) val job: Job = new Job(jobConf) val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job) diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index f9f65a77efe..a0425b76ff2 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -41,6 +41,7 @@ import org.apache.carbondata.converter.SparkDataTypeConverterImpl import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.datastore.block._ +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.blocklet.DataFileFooter @@ -60,15 +61,15 @@ import org.apache.carbondata.spark.MergeResult import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util} class CarbonMergerRDD[K, V]( - sc: SparkContext, + @transient ss: SparkSession, result: MergeResult[K, V], carbonLoadModel: CarbonLoadModel, carbonMergerMapping: CarbonMergerMapping, confExecutorsTemp: String) - extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) { + extends CarbonRDD[(K, V)](ss, Nil) { - sc.setLocalProperty("spark.scheduler.pool", "DDL") - sc.setLocalProperty("spark.job.interruptOnCancel", "true") + ss.sparkContext.setLocalProperty("spark.scheduler.pool", "DDL") + ss.sparkContext.setLocalProperty("spark.job.interruptOnCancel", "true") private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "") var storeLocation: String = null @@ -183,7 +184,7 @@ class CarbonMergerRDD[K, V]( } try { // fire a query and get the results. - rawResultIteratorList = exec.processTableBlocks() + rawResultIteratorList = exec.processTableBlocks(FileFactory.getConfiguration) } catch { case e: Throwable => LOGGER.error(e) @@ -269,7 +270,7 @@ class CarbonMergerRDD[K, V]( iter } - override def getPartitions: Array[Partition] = { + override def internalGetPartitions: Array[Partition] = { val startTime = System.currentTimeMillis() val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from( tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId) @@ -277,7 +278,7 @@ class CarbonMergerRDD[K, V]( val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val updateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager( carbonTable) - val jobConf: JobConf = new JobConf(new Configuration) + val jobConf: JobConf = new JobConf(getConf) SparkHadoopUtil.get.addCredentials(jobConf) val job: Job = new Job(jobConf) val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job) diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala index 54a7530c554..04f20b1e4c0 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala @@ -17,28 +17,24 @@ package org.apache.carbondata.spark.rdd -import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} - import scala.collection.JavaConverters._ import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration -import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext, TaskContext} +import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext} +import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.util.SparkSQLUtil -import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.util._ -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil /** * This RDD maintains session level ThreadLocal */ -abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext, - @transient private var deps: Seq[Dependency[_]], - @transient hadoopConf: Configuration) extends RDD[T](sc, deps) { +abstract class CarbonRDD[T: ClassTag](@transient ss: SparkSession, + @transient private var deps: Seq[Dependency[_]]) extends RDD[T](ss.sparkContext, deps) { val carbonSessionInfo: CarbonSessionInfo = { var info = ThreadLocalSessionInfo.getCarbonSessionInfo @@ -50,24 +46,27 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext, info } - private val confBytes = { - val bao = new ByteArrayOutputStream() - val oos = new ObjectOutputStream(bao) - hadoopConf.write(oos) - oos.close() - CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray) - } + val config: Broadcast[SerializableConfiguration] = sparkContext + .broadcast(new SerializableConfiguration(SparkSQLUtil.sessionState(ss).newHadoopConf())) /** Construct an RDD with just a one-to-one dependency on one parent */ - def this(@transient oneParent: RDD[_]) = - this (oneParent.context, List(new OneToOneDependency(oneParent)), - oneParent.sparkContext.hadoopConfiguration) + def this(@transient sparkSession: SparkSession, @transient oneParent: RDD[_]) = + this (sparkSession, List(new OneToOneDependency(oneParent))) + + protected def internalGetPartitions: Array[Partition] + + override def getPartitions: Array[Partition] = { + ThreadLocalSessionInfo.setConfigurationToCurrentThread(config.value.value) + internalGetPartitions + } // RDD compute logic should be here def internalCompute(split: Partition, context: TaskContext): Iterator[T] final def compute(split: Partition, context: TaskContext): Iterator[T] = { - CarbonInputFormatUtil.setS3Configurations(getConf) + TaskContext.get.addTaskCompletionListener(_ => ThreadLocalSessionInfo.unsetAll()) + carbonSessionInfo.getNonSerializableExtraInfo.put("carbonConf", config + .value.value) ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) TaskMetricsMap.threadLocal.set(Thread.currentThread().getId) val carbonTaskInfo = new CarbonTaskInfo @@ -79,13 +78,7 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext, } def getConf: Configuration = { - val configuration = new Configuration(false) - val bai = new ByteArrayInputStream(CompressorFactory.getInstance().getCompressor - .unCompressByte(confBytes)) - val ois = new ObjectInputStream(bai) - configuration.readFields(ois) - ois.close() - configuration + config.value.value } } @@ -93,12 +86,14 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext, * This RDD contains TableInfo object which is serialized and deserialized in driver and executor */ abstract class CarbonRDDWithTableInfo[T: ClassTag]( - @transient sc: SparkContext, + @transient ss: SparkSession, @transient private var deps: Seq[Dependency[_]], - serializedTableInfo: Array[Byte]) extends CarbonRDD[T](sc, deps, sc.hadoopConfiguration) { + serializedTableInfo: Array[Byte]) extends CarbonRDD[T](ss, deps) { - def this(@transient oneParent: RDD[_], serializedTableInfo: Array[Byte]) = - this (oneParent.context, List(new OneToOneDependency(oneParent)), serializedTableInfo) + def this(@transient sparkSession: SparkSession, @transient oneParent: RDD[_], + serializedTableInfo: Array[Byte]) = { + this (sparkSession, List(new OneToOneDependency(oneParent)), serializedTableInfo) + } def getTableInfo: TableInfo = TableInfo.deserialize(serializedTableInfo) } diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala index 9452777585c..241720a6985 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.spark.{Partition, TaskContext} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.command.AlterPartitionModel import org.apache.spark.sql.hive.DistributionUtil import org.apache.spark.sql.util.CarbonException @@ -36,6 +37,7 @@ import org.apache.spark.util.PartitionUtils import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.converter.SparkDataTypeConverterImpl import org.apache.carbondata.core.datastore.block.{Distributable, SegmentProperties, TaskBlockInfo} +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.datatype.DataTypes @@ -65,7 +67,7 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel, absoluteTableIdentifier: AbsoluteTableIdentifier, partitionIds: Seq[String], bucketId: Int) - extends RDD[(AnyRef, Array[AnyRef])](alterPartitionModel.sqlContext.sparkContext, Nil) { + extends CarbonRDD[(AnyRef, Array[AnyRef])](alterPartitionModel.sqlContext.sparkSession, Nil) { private val queryId = alterPartitionModel.sqlContext.sparkContext.getConf .get("queryId", System.nanoTime() + "") @@ -91,9 +93,9 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel, val dictionaryIndexGroup: ArrayBuffer[Int] = new ArrayBuffer[Int]() val measureIndexGroup: ArrayBuffer[Int] = new ArrayBuffer[Int]() - override def getPartitions: Array[Partition] = { + override def internalGetPartitions: Array[Partition] = { val parallelism = sparkContext.defaultParallelism - val jobConf = new JobConf(new Configuration) + val jobConf = new JobConf(FileFactory.getConfiguration) val job = new Job(jobConf) val format = CarbonInputFormatUtil.createCarbonTableInputFormat(absoluteTableIdentifier, partitionIds.toList.asJava, job) @@ -127,8 +129,8 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel, result.toArray(new Array[Partition](result.size())) } - override def compute(split: Partition, context: TaskContext): - Iterator[(AnyRef, Array[AnyRef])] = { + override def internalCompute(split: Partition, context: TaskContext): + Iterator[(AnyRef, Array[AnyRef])] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) var exec : CarbonSplitExecutor = null val rows : java.util.List[(AnyRef, Array[AnyRef])] = new ArrayList[(AnyRef, Array[AnyRef])]() @@ -142,7 +144,8 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel, var result : java.util.List[PartitionSpliterRawResultIterator] = null try { exec = new CarbonSplitExecutor(segmentMapping, carbonTable) - result = exec.processDataBlocks(segmentId, new SparkDataTypeConverterImpl()) + result = exec.processDataBlocks(segmentId, new SparkDataTypeConverterImpl(), + FileFactory.getConfiguration) } catch { case e: Throwable => LOGGER.error(e) diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index e60d5b82aa9..f5d96fc8d6f 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -44,6 +44,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.converter.SparkDataTypeConverterImpl import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonCommonConstantsInternal} import org.apache.carbondata.core.datastore.block.Distributable +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.TableInfo @@ -79,7 +80,7 @@ class CarbonScanRDD[T: ClassTag]( @transient val partitionNames: Seq[PartitionSpec], val dataTypeConverterClz: Class[_ <: DataTypeConverter] = classOf[SparkDataTypeConverterImpl], val readSupportClz: Class[_ <: CarbonReadSupport[_]] = SparkReadSupport.readSupportClass) - extends CarbonRDDWithTableInfo[T](spark.sparkContext, Nil, serializedTableInfo) { + extends CarbonRDDWithTableInfo[T](spark, Nil, serializedTableInfo) { private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "") private val jobTrackerId: String = { @@ -92,7 +93,7 @@ class CarbonScanRDD[T: ClassTag]( @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - override def getPartitions: Array[Partition] = { + override def internalGetPartitions: Array[Partition] = { val startTime = System.currentTimeMillis() var partitions: Array[Partition] = Array.empty[Partition] var getSplitsStartTime: Long = -1 @@ -105,7 +106,7 @@ class CarbonScanRDD[T: ClassTag]( var numBlocks = 0 try { - val conf = new Configuration() + val conf = FileFactory.getConfiguration val jobConf = new JobConf(conf) SparkHadoopUtil.get.addCredentials(jobConf) val job = Job.getInstance(jobConf) @@ -405,7 +406,7 @@ class CarbonScanRDD[T: ClassTag]( val executionId = context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) val taskId = split.index val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) - val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId) + val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId) val format = prepareInputFormatForExecutor(attemptContext.getConfiguration) val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value TaskMetricsMap.getInstance().registerThreadCallback() @@ -436,14 +437,16 @@ class CarbonScanRDD[T: ClassTag]( "true") if (carbonRecordReader == null) { new CarbonRecordReader(model, - format.getReadSupportClass(attemptContext.getConfiguration), inputMetricsStats) + format.getReadSupportClass(attemptContext.getConfiguration), + inputMetricsStats, + attemptContext.getConfiguration) } else { carbonRecordReader } } else { new CarbonRecordReader(model, format.getReadSupportClass(attemptContext.getConfiguration), - inputMetricsStats) + inputMetricsStats, attemptContext.getConfiguration) } } diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 3848bada696..1ada51bf825 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -32,7 +32,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark.{Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext} import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionWrap, RDD} import org.apache.spark.serializer.SerializerInstance -import org.apache.spark.sql.Row +import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.execution.command.ExecutionErrors import org.apache.spark.util.SparkUtil @@ -41,11 +41,11 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.common.logging.impl.StandardLogService import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.compression.CompressorFactory +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus} -import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalSessionInfo, ThreadLocalTaskInfo} import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses, TableProcessingOperations} import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, CSVRecordReaderIterator} import org.apache.carbondata.processing.loading.exception.NoRetryException @@ -160,29 +160,20 @@ class SparkPartitionLoader(model: CarbonLoadModel, * It loads the data to carbon using @AbstractDataLoadProcessorStep */ class NewCarbonDataLoadRDD[K, V]( - sc: SparkContext, + @transient ss: SparkSession, result: DataLoadResult[K, V], carbonLoadModel: CarbonLoadModel, - blocksGroupBy: Array[(String, Array[BlockDetails])], - @transient hadoopConf: Configuration) - extends CarbonRDD[(K, V)](sc, Nil, hadoopConf) { + blocksGroupBy: Array[(String, Array[BlockDetails])]) + extends CarbonRDD[(K, V)](ss, Nil) { - sc.setLocalProperty("spark.scheduler.pool", "DDL") + ss.sparkContext.setLocalProperty("spark.scheduler.pool", "DDL") private val jobTrackerId: String = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") formatter.format(new Date()) } - private val confBytes = { - val bao = new ByteArrayOutputStream() - val oos = new ObjectOutputStream(bao) - hadoopConf.write(oos) - oos.close() - CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray) - } - - override def getPartitions: Array[Partition] = { + override def internalGetPartitions: Array[Partition] = { blocksGroupBy.zipWithIndex.map { b => new CarbonNodePartition(id, b._2, b._1._1, b._1._2) } @@ -245,10 +236,7 @@ class NewCarbonDataLoadRDD[K, V]( def getInputIterators: Array[CarbonIterator[Array[AnyRef]]] = { val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, theSplit.index, 0) - var configuration: Configuration = getConf - if (configuration == null) { - configuration = new Configuration() - } + val configuration: Configuration = FileFactory.getConfiguration CommonUtil.configureCSVInputFormat(configuration, carbonLoadModel) val hadoopAttemptContext = new TaskAttemptContextImpl(configuration, attemptId) val format = new CSVInputFormat @@ -319,24 +307,13 @@ class NewCarbonDataLoadRDD[K, V]( * @see org.apache.carbondata.processing.newflow.DataLoadExecutor */ class NewDataFrameLoaderRDD[K, V]( - sc: SparkContext, + @transient ss: SparkSession, result: DataLoadResult[K, V], carbonLoadModel: CarbonLoadModel, - prev: DataLoadCoalescedRDD[Row], - @transient hadoopConf: Configuration) extends CarbonRDD[(K, V)](prev) { - - private val confBytes = { - val bao = new ByteArrayOutputStream() - val oos = new ObjectOutputStream(bao) - hadoopConf.write(oos) - oos.close() - CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray) - } + prev: DataLoadCoalescedRDD[Row]) extends CarbonRDD[(K, V)](ss, prev) { override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - val hadoopConf = getConf - CarbonInputFormatUtil.setS3Configurations(hadoopConf) val iter = new Iterator[(K, V)] { val loadMetadataDetails = new LoadMetadataDetails() val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") @@ -404,7 +381,7 @@ class NewDataFrameLoaderRDD[K, V]( } iter } - override protected def getPartitions: Array[Partition] = firstParent[Row].partitions + override protected def internalGetPartitions: Array[Partition] = firstParent[Row].partitions } /** @@ -528,10 +505,10 @@ class LazyRddIterator(serializer: SerializerInstance, * @see org.apache.carbondata.processing.newflow.DataLoadExecutor */ class PartitionTableDataLoaderRDD[K, V]( - sc: SparkContext, + @transient ss: SparkSession, result: DataLoadResult[K, V], carbonLoadModel: CarbonLoadModel, - prev: RDD[Row]) extends CarbonRDD[(K, V)](prev) { + prev: RDD[Row]) extends CarbonRDD[(K, V)](ss, prev) { override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) @@ -596,6 +573,6 @@ class PartitionTableDataLoaderRDD[K, V]( iter } - override protected def getPartitions: Array[Partition] = firstParent[Row].partitions + override protected def internalGetPartitions: Array[Partition] = firstParent[Row].partitions } diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala index 43ee31be5f3..b8e73d5576a 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala @@ -25,9 +25,12 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{InputSplit, Job, TaskAttemptID, TaskType} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.spark.{Partition, SparkContext, TaskContext, TaskKilledException} +import org.apache.spark.{Partition, TaskContext, TaskKilledException} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.util.SparkSQLUtil import org.apache.carbondata.core.datamap.{AbstractDataMapJob, DistributableDataMapFormat} +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.indexstore.ExtendedBlocklet import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf @@ -38,8 +41,8 @@ class SparkDataMapJob extends AbstractDataMapJob { override def execute(dataMapFormat: DistributableDataMapFormat, filter: FilterResolverIntf): util.List[ExtendedBlocklet] = { - new DataMapPruneRDD(SparkContext.getOrCreate(), dataMapFormat, filter).collect().toList - .asJava + new DataMapPruneRDD(SparkSQLUtil.getSparkSession, dataMapFormat, filter).collect() + .toList.asJava } } @@ -51,13 +54,13 @@ class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit) exte /** * RDD to prune the datamaps across spark cluster - * @param sc + * @param ss * @param dataMapFormat */ -class DataMapPruneRDD(sc: SparkContext, +class DataMapPruneRDD(@transient ss: SparkSession, dataMapFormat: DistributableDataMapFormat, resolverIntf: FilterResolverIntf) - extends CarbonRDD[(ExtendedBlocklet)](sc, Nil, sc.hadoopConfiguration) { + extends CarbonRDD[(ExtendedBlocklet)](ss, Nil) { private val jobTrackerId: String = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") @@ -67,7 +70,7 @@ class DataMapPruneRDD(sc: SparkContext, override def internalCompute(split: Partition, context: TaskContext): Iterator[ExtendedBlocklet] = { val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) - val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId) + val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId) val inputSplit = split.asInstanceOf[DataMapRDDPartition].inputSplit val reader = dataMapFormat.createRecordReader(inputSplit, attemptContext) reader.initialize(inputSplit, attemptContext) @@ -102,8 +105,8 @@ class DataMapPruneRDD(sc: SparkContext, iter } - override protected def getPartitions: Array[Partition] = { - val job = Job.getInstance(new Configuration()) + override protected def internalGetPartitions: Array[Partition] = { + val job = Job.getInstance(FileFactory.getConfiguration) val splits = dataMapFormat.getSplits(job) splits.asScala.zipWithIndex.map(f => new DataMapRDDPartition(id, f._2, f._1)).toArray } diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala index f7bbf06d02c..39e18751ae5 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala @@ -98,11 +98,10 @@ class StreamingRawResultIterator( * execute streaming segment handoff */ class StreamHandoffRDD[K, V]( - sc: SparkContext, + @transient ss: SparkSession, result: HandoffResult[K, V], carbonLoadModel: CarbonLoadModel, - handOffSegmentId: String -) extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) { + handOffSegmentId: String) extends CarbonRDD[(K, V)](ss, Nil) { private val jobTrackerId: String = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") @@ -111,8 +110,7 @@ class StreamHandoffRDD[K, V]( override def internalCompute( split: Partition, - context: TaskContext - ): Iterator[(K, V)] = { + context: TaskContext): Iterator[(K, V)] = { carbonLoadModel.setTaskNo("" + split.index) val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl) @@ -148,7 +146,7 @@ class StreamHandoffRDD[K, V]( ): util.ArrayList[RawResultIterator] = { val inputSplit = split.asInstanceOf[HandoffPartition].split.value val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) - val hadoopConf = new Configuration() + val hadoopConf = getConf CarbonInputFormat.setDatabaseName(hadoopConf, carbonTable.getDatabaseName) CarbonInputFormat.setTableName(hadoopConf, carbonTable.getTableName) CarbonInputFormat.setTablePath(hadoopConf, carbonTable.getTablePath) @@ -200,7 +198,7 @@ class StreamHandoffRDD[K, V]( /** * get the partitions of the handoff segment */ - override protected def getPartitions: Array[Partition] = { + override protected def internalGetPartitions: Array[Partition] = { val job = Job.getInstance(FileFactory.getConfiguration) val inputFormat = new CarbonTableInputFormat[Array[Object]]() val segmentList = new util.ArrayList[Segment](1) @@ -323,7 +321,7 @@ object StreamHandoffRDD { // convert a streaming segment to columnar segment val status = new StreamHandoffRDD( - sparkSession.sparkContext, + sparkSession, new HandoffResultImpl(), carbonLoadModel, handoffSegmenId).collect() diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index 1cd4d773d95..e79c63b0fd8 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -30,7 +30,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.spark.{SparkContext, SparkEnv} import org.apache.spark.rdd.CarbonMergeFilesRDD -import org.apache.spark.sql.{Row, RowFactory} +import org.apache.spark.sql.{Row, RowFactory, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField} @@ -839,7 +839,7 @@ object CommonUtil { * which do not store the blocklet info to current * version */ - def mergeIndexFiles(sparkContext: SparkContext, + def mergeIndexFiles(sparkSession: SparkSession, segmentIds: Seq[String], segmentFileNameToSegmentIdMap: java.util.Map[String, String], tablePath: String, @@ -848,7 +848,7 @@ object CommonUtil { readFileFooterFromCarbonDataFile: Boolean = false): Unit = { if (mergeIndexProperty) { new CarbonMergeFilesRDD( - sparkContext, + sparkSession, carbonTable, segmentIds, segmentFileNameToSegmentIdMap, @@ -860,7 +860,7 @@ object CommonUtil { CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) { new CarbonMergeFilesRDD( - sparkContext, + sparkSession, carbonTable, segmentIds, segmentFileNameToSegmentIdMap, @@ -871,7 +871,7 @@ object CommonUtil { case _: Exception => if (CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) { new CarbonMergeFilesRDD( - sparkContext, + sparkSession, carbonTable, segmentIds, segmentFileNameToSegmentIdMap, diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala index 1bb391232d9..67c4c9be8ca 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala @@ -461,9 +461,11 @@ object GlobalDictionaryUtil { dictFolderPath, forPreDefDict = true) // new RDD to achieve distributed column dict generation val extInputRDD = new CarbonColumnDictGenerateRDD(carbonLoadModel, dictLoadModel, - sqlContext.sparkContext, table, dimensions, dictFolderPath) + sqlContext.sparkSession, table, dimensions, dictFolderPath) .partitionBy(new ColumnPartitioner(dictLoadModel.primDimensions.length)) - val statusList = new CarbonGlobalDictionaryGenerateRDD(extInputRDD, dictLoadModel).collect() + val statusList = new CarbonGlobalDictionaryGenerateRDD(sqlContext.sparkSession, extInputRDD, + dictLoadModel) + .collect() // check result status checkStatus(carbonLoadModel, sqlContext, dictLoadModel, statusList) } @@ -670,10 +672,13 @@ object GlobalDictionaryUtil { val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier, requireDimension, dictfolderPath, false) // combine distinct value in a block and partition by column - val inputRDD = new CarbonBlockDistinctValuesCombineRDD(dictRdd, model) + val inputRDD = new CarbonBlockDistinctValuesCombineRDD(sqlContext.sparkSession, dictRdd, + model) .partitionBy(new ColumnPartitioner(model.primDimensions.length)) // generate global dictionary files - val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect() + val statusList = new CarbonGlobalDictionaryGenerateRDD(sqlContext.sparkSession, + inputRDD, model) + .collect() // check result status checkStatus(carbonLoadModel, sqlContext, model, statusList) } else { @@ -731,10 +736,13 @@ object GlobalDictionaryUtil { val allDictionaryRdd = readAllDictionaryFiles(sqlContext, headers, requireColumnNames, allDictionaryPathAppended, accumulator) // read exist dictionary and combine - val inputRDD = new CarbonAllDictionaryCombineRDD(allDictionaryRdd, model) + val inputRDD = new CarbonAllDictionaryCombineRDD(sqlContext.sparkSession, + allDictionaryRdd, model) .partitionBy(new ColumnPartitioner(model.primDimensions.length)) // generate global dictionary files - val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect() + val statusList = new CarbonGlobalDictionaryGenerateRDD(sqlContext.sparkSession, inputRDD, + model) + .collect() // check result status checkStatus(carbonLoadModel, sqlContext, model, statusList) // if the dictionary contains wrong format record, throw ex diff --git a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala index 1acdf7ec985..b5147f0b686 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala @@ -17,7 +17,8 @@ package org.apache.spark.rdd -import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.sql.SparkSession import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.path.CarbonTablePath @@ -35,20 +36,20 @@ case class CarbonMergeFilePartition(rddId: Int, idx: Int, segmentId: String) /** * RDD to merge all carbonindex files of each segment to carbonindex file into the same segment. - * @param sc + * @param ss * @param carbonTable * @param segments segments to be merged */ class CarbonMergeFilesRDD( - sc: SparkContext, + @transient ss: SparkSession, carbonTable: CarbonTable, segments: Seq[String], segmentFileNameToSegmentIdMap: java.util.Map[String, String], isHivePartitionedTable: Boolean, readFileFooterFromCarbonDataFile: Boolean) - extends CarbonRDD[String](sc, Nil, sc.hadoopConfiguration) { + extends CarbonRDD[String](ss, Nil) { - override def getPartitions: Array[Partition] = { + override def internalGetPartitions: Array[Partition] = { segments.zipWithIndex.map {s => CarbonMergeFilePartition(id, s._2, s._1) }.toArray diff --git a/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala index 6a97477c6c8..2854c9123f1 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala @@ -20,6 +20,7 @@ package org.apache.spark.rdd import scala.reflect.ClassTag import org.apache.spark._ +import org.apache.spark.sql.SparkSession import org.apache.carbondata.spark.rdd.CarbonRDD @@ -27,12 +28,12 @@ import org.apache.carbondata.spark.rdd.CarbonRDD case class DataLoadPartitionWrap[T: ClassTag](rdd: RDD[T], partition: Partition) class DataLoadCoalescedRDD[T: ClassTag]( + @transient sparkSession: SparkSession, @transient var prev: RDD[T], nodeList: Array[String]) - extends CarbonRDD[DataLoadPartitionWrap[T]](prev.context, Nil, - prev.sparkContext.hadoopConfiguration) { + extends CarbonRDD[DataLoadPartitionWrap[T]](sparkSession, Nil) { - override def getPartitions: Array[Partition] = { + override def internalGetPartitions: Array[Partition] = { new DataLoadPartitionCoalescer(prev, nodeList).run } diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index 1b48c08cdec..da226589d6b 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -117,7 +117,7 @@ case class CarbonMergerMapping( var maxSegmentColCardinality: Array[Int], // maxSegmentColumnSchemaList is list of column schema of last segment of compaction var maxSegmentColumnSchemaList: List[ColumnSchema], - currentPartitions: Option[Seq[PartitionSpec]]) + @transient currentPartitions: Option[Seq[PartitionSpec]]) case class NodeInfo(TaskId: String, noOfBlocks: Int) diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala index 2e31a82d3f6..b5fda853357 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala @@ -27,4 +27,8 @@ object SparkSQLUtil { def execute(logicalPlan: LogicalPlan, sparkSession: SparkSession): DataFrame = { Dataset.ofRows(sparkSession, logicalPlan) } + + def getSparkSession: SparkSession = { + SparkSession.getDefaultSession.get + } } diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala index fdbf4004820..e2aa9ae62b9 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala @@ -159,7 +159,7 @@ object PartitionUtils { partitionIds: List[String], oldPartitionIdList: List[Int], partitionInfo: PartitionInfo, carbonTable: CarbonTable): java.util.List[TableBlockInfo] = { - val jobConf = new JobConf(new Configuration) + val jobConf = new JobConf(FileFactory.getConfiguration) val job = new Job(jobConf) val format = CarbonInputFormatUtil .createCarbonTableInputFormat(identifier, partitionIds.asJava, job) diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java index 2c53d2037ce..f23755248e4 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java +++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java @@ -136,7 +136,8 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont queryModel.setTableBlockInfos(tableBlockInfoList); queryModel.setVectorReader(true); try { - queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); + queryExecutor = + QueryExecutorFactory.getQueryExecutor(queryModel, taskAttemptContext.getConfiguration()); iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel); } catch (QueryExecutionException e) { if (ExceptionUtils.indexOfThrowable(e, FileNotFoundException.class) > 0) { diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala index d321cab1289..81b395e5850 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala +++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala @@ -341,7 +341,8 @@ class SparkCarbonFileFormat extends FileFormat split.setDetailInfo(info) info.setBlockSize(file.length) // Read the footer offset and set. - val reader = FileFactory.getFileHolder(FileFactory.getFileType(file.filePath)) + val reader = FileFactory.getFileHolder(FileFactory.getFileType(file.filePath), + broadcastedHadoopConf.value.value) val buffer = reader .readByteBuffer(FileFactory.getUpdatedFilePath(file.filePath), file.length - 8, 8) info.setBlockFooterOffset(buffer.getLong) diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala index 8309065095b..82c64a457de 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala @@ -292,14 +292,13 @@ class RawBytesReadSupport(segmentProperties: SegmentProperties, indexColumns: Ar } class IndexDataMapRebuildRDD[K, V]( - session: SparkSession, + @transient session: SparkSession, result: RefreshResult[K, V], @transient tableInfo: TableInfo, dataMapName: String, indexColumns: Array[CarbonColumn], - segments: Set[Segment] -) extends CarbonRDDWithTableInfo[(K, V)]( - session.sparkContext, Nil, tableInfo.serialize()) { + segments: Set[Segment]) + extends CarbonRDDWithTableInfo[(K, V)](session, Nil, tableInfo.serialize()) { private val dataMapSchema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName) private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "") @@ -323,7 +322,7 @@ class IndexDataMapRebuildRDD[K, V]( inputMetrics.initBytesReadCallback(context, inputSplit) val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) - val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId) + val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId) val format = createInputFormat(segment.get, attemptContext) val model = format.createQueryModel(inputSplit, attemptContext) @@ -351,7 +350,8 @@ class IndexDataMapRebuildRDD[K, V]( } else { new OriginalReadSupport(indexColumns.map(_.getDataType)) } - reader = new CarbonRecordReader[Array[Object]](model, readSupport, inputMetrics) + reader = new CarbonRecordReader[Array[Object]](model, readSupport, inputMetrics, + attemptContext.getConfiguration) reader.initialize(inputSplit, attemptContext) // skip clear datamap and we will do this adter rebuild reader.setSkipClearDataMapAtClose(true) @@ -439,11 +439,11 @@ class IndexDataMapRebuildRDD[K, V]( format } - override protected def getPartitions = { + override protected def internalGetPartitions = { if (!dataMapSchema.isIndexDataMap) { throw new UnsupportedOperationException } - val conf = new Configuration() + val conf = FileFactory.getConfiguration val jobConf = new JobConf(conf) SparkHadoopUtil.get.addCredentials(jobConf) val job = Job.getInstance(jobConf) diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 6e322b3767c..0fd4e344a10 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -62,9 +62,10 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.scan.partition.PartitionUtil import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} -import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil, ThreadLocalSessionInfo} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.FailureCauses import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, StringArrayWritable} @@ -329,7 +330,8 @@ object CarbonDataRDDFactory { dataFrame, carbonLoadModel, updateModel, - carbonTable) + carbonTable, + hadoopConf) res.foreach { resultOfSeg => resultOfSeg.foreach { resultOfBlock => if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) { @@ -680,7 +682,8 @@ object CarbonDataRDDFactory { dataFrame: Option[DataFrame], carbonLoadModel: CarbonLoadModel, updateModel: Option[UpdateTableModel], - carbonTable: CarbonTable): Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = { + carbonTable: CarbonTable, + hadoopConf: Configuration): Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = { val segmentUpdateParallelism = CarbonProperties.getInstance().getParallelismForSegmentUpdate val updateRdd = dataFrame.get.rdd @@ -720,7 +723,9 @@ object CarbonDataRDDFactory { // because partitionId=segmentIdIndex*parallelism+RandomPart and RandomPart + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) val partitionId = TaskContext.getPartitionId() val segIdIndex = partitionId / segmentUpdateParallelism val randomPart = partitionId - segIdIndex * segmentUpdateParallelism @@ -1070,7 +1075,7 @@ object CarbonDataRDDFactory { try { val rdd = repartitionInputData(sqlContext, dataFrame, carbonLoadModel, hadoopConf) new PartitionTableDataLoaderRDD( - sqlContext.sparkContext, + sqlContext.sparkSession, new DataLoadResultImpl(), carbonLoadModel, rdd @@ -1099,14 +1104,14 @@ object CarbonDataRDDFactory { val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList( nodeNumOfData, sqlContext.sparkContext) - val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct) + val newRdd = new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, rdd, nodes.toArray + .distinct) new NewDataFrameLoaderRDD( - sqlContext.sparkContext, + sqlContext.sparkSession, new DataLoadResultImpl(), carbonLoadModel, - newRdd, - sqlContext.sparkContext.hadoopConfiguration + newRdd ).collect() } catch { case ex: Exception => @@ -1207,11 +1212,10 @@ object CarbonDataRDDFactory { }.toArray new NewCarbonDataLoadRDD( - sqlContext.sparkContext, + sqlContext.sparkSession, new DataLoadResultImpl(), carbonLoadModel, - blocksGroupBy, - hadoopConf + blocksGroupBy ).collect() } } diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index fcc649eae8e..d9884e1175c 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -186,7 +186,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, val mergeStatus = if (CompactionType.IUD_UPDDEL_DELTA == compactionType) { new CarbonIUDMergerRDD( - sc.sparkContext, + sc.sparkSession, new MergeResultImpl(), carbonLoadModel, carbonMergerMapping, @@ -194,7 +194,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, ).collect } else { new CarbonMergerRDD( - sc.sparkContext, + sc.sparkSession, new MergeResultImpl(), carbonLoadModel, carbonMergerMapping, diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala index 662224646b1..9b78db0296e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala @@ -31,9 +31,11 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.LeafExecNode import org.apache.spark.sql.optimizer.CarbonFilters +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.util.ThreadLocalSessionInfo import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil @@ -44,6 +46,8 @@ case class CarbonCountStar( outUnsafeRows: Boolean = true) extends LeafExecNode { override def doExecute(): RDD[InternalRow] = { + ThreadLocalSessionInfo + .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf()) val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier val (job, tableInputFormat) = createCarbonInputFormat(absoluteTableIdentifier) CarbonInputFormat.setQuerySegment(job.getConfiguration, absoluteTableIdentifier) @@ -73,7 +77,7 @@ case class CarbonCountStar( private def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier ): (Job, CarbonTableInputFormat[Array[Object]]) = { val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]() - val jobConf: JobConf = new JobConf(new Configuration) + val jobConf: JobConf = new JobConf(FileFactory.getConfiguration) SparkHadoopUtil.get.addCredentials(jobConf) CarbonInputFormat.setTableInfo(jobConf, carbonTable.getTableInfo) val job = new Job(jobConf) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index b5842a9d7e4..8a0404c13cf 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -56,7 +56,6 @@ case class CarbonDatasourceHadoopRelation( caseInsensitiveMap("tablename")) lazy val databaseName: String = carbonTable.getDatabaseName lazy val tableName: String = carbonTable.getTableName - CarbonInputFormatUtil.setS3Configurations(sparkSession.sessionState.newHadoopConf()) CarbonSession.updateSessionInfoToCurrentThread(sparkSession) @transient lazy val carbonRelation: CarbonRelation = diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala index 34a8dce5753..d0ed56e68ec 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala @@ -41,9 +41,10 @@ import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension import org.apache.carbondata.core.scan.executor.util.QueryUtil -import org.apache.carbondata.core.util.DataTypeUtil +import org.apache.carbondata.core.util.{DataTypeUtil, ThreadLocalSessionInfo} +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.spark.CarbonAliasDecoderRelation -import org.apache.carbondata.spark.rdd.CarbonRDDWithTableInfo +import org.apache.carbondata.spark.rdd.{CarbonRDDWithTableInfo, SerializableConfiguration} /** * It decodes the data. @@ -75,9 +76,12 @@ case class CarbonDictionaryDecoder( (carbonTable.getTableName, carbonTable) }.toMap + val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(sparkSession + .sessionState.newHadoopConf())) if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) { val dataTypes = child.output.map { attr => attr.dataType } child.execute().mapPartitions { iter => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) val cacheProvider: CacheProvider = CacheProvider.getInstance val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY) @@ -439,7 +443,9 @@ class CarbonDecoderRDD( val prev: RDD[InternalRow], output: Seq[Attribute], serializedTableInfo: Array[Byte]) - extends CarbonRDDWithTableInfo[InternalRow](prev, serializedTableInfo) { + extends CarbonRDDWithTableInfo[InternalRow](relations.head.carbonRelation.sparkSession, + prev, + serializedTableInfo) { def canBeDecoded(attr: Attribute): Boolean = { profile match { @@ -543,7 +549,8 @@ class CarbonDecoderRDD( dicts } - override protected def getPartitions: Array[Partition] = firstParent[InternalRow].partitions + override protected def internalGetPartitions: Array[Partition] = + firstParent[InternalRow].partitions } /** diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index 7f268883c1d..8253c4d24f1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -17,11 +17,8 @@ package org.apache.spark.sql -import java.io.File import java.util.concurrent.ConcurrentHashMap -import scala.util.Try - import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.SessionCatalog @@ -29,7 +26,6 @@ import org.apache.spark.sql.events.MergeIndexEventListener import org.apache.spark.sql.execution.command.preaaggregate._ import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction import org.apache.spark.sql.hive._ -import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -101,6 +97,8 @@ class CarbonEnv { threadLevelCarbonSessionInfo.setThreadParams(currentThreadSesssionInfo.getThreadParams) } ThreadLocalSessionInfo.setCarbonSessionInfo(threadLevelCarbonSessionInfo) + ThreadLocalSessionInfo.setConfigurationToCurrentThread(sparkSession + .sessionState.newHadoopConf()) val config = new CarbonSQLConf(sparkSession) if (sparkSession.conf.getOption(CarbonCommonConstants.ENABLE_UNSAFE_SORT).isEmpty) { config.addDefaultCarbonParams() diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index c59bb086362..5af64ffe7c1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -40,7 +40,6 @@ import org.apache.carbondata.common.annotations.InterfaceAudience import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, ThreadLocalSessionInfo} -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.store.SparkCarbonStore import org.apache.carbondata.streaming.CarbonStreamingQueryListener @@ -432,6 +431,8 @@ object CarbonSession { } // preserve thread parameters across call ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) + ThreadLocalSessionInfo + .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf()) } } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index b162294b9ed..693a8c4897f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -329,7 +329,6 @@ object CarbonSource { .contains("true") tableInfo.setTransactionalTable(isTransactionalTable) if (isTransactionalTable && !metaStore.isReadFromHiveMetaStore) { - CarbonInputFormatUtil.setS3Configurations(sparkSession.sessionState.newHadoopConf()) // save to disk metaStore.saveToDisk(tableInfo, properties("tablePath")) // remove schema string from map as we don't store carbon schema to hive metastore diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala index 5bff9aa3004..24ef0db0ece 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala @@ -54,7 +54,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging { val sparkSession = SparkSession.getActiveSession.get if(!carbonTable.isStreamingSink) { if (null != compactedSegments && !compactedSegments.isEmpty) { - mergeIndexFilesForCompactedSegments(sparkSession.sparkContext, + mergeIndexFilesForCompactedSegments(sparkSession, carbonTable, compactedSegments) } else { @@ -63,7 +63,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging { segmentFileNameMap .put(loadModel.getSegmentId, String.valueOf(loadModel.getFactTimeStamp)) - CommonUtil.mergeIndexFiles(sparkSession.sparkContext, + CommonUtil.mergeIndexFiles(sparkSession, Seq(loadModel.getSegmentId), segmentFileNameMap, carbonTable.getTablePath, @@ -77,9 +77,9 @@ class MergeIndexEventListener extends OperationEventListener with Logging { val alterTableCompactionPostEvent = event.asInstanceOf[AlterTableCompactionPostEvent] val carbonTable = alterTableCompactionPostEvent.carbonTable val mergedLoads = alterTableCompactionPostEvent.compactedLoads - val sparkContext = alterTableCompactionPostEvent.sparkSession.sparkContext + val sparkSession = alterTableCompactionPostEvent.sparkSession if(!carbonTable.isStreamingSink) { - mergeIndexFilesForCompactedSegments(sparkContext, carbonTable, mergedLoads) + mergeIndexFilesForCompactedSegments(sparkSession, carbonTable, mergedLoads) } case alterTableMergeIndexEvent: AlterTableMergeIndexEvent => val exceptionEvent = event.asInstanceOf[AlterTableMergeIndexEvent] @@ -123,7 +123,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging { // store (store <= 1.1 version) and create merge Index file as per new store so that // old store is also upgraded to new store CommonUtil.mergeIndexFiles( - sparkContext = sparkSession.sparkContext, + sparkSession = sparkSession, segmentIds = validSegmentIds, segmentFileNameToSegmentIdMap = segmentFileNameMap, tablePath = carbonMainTable.getTablePath, @@ -155,7 +155,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging { } } - def mergeIndexFilesForCompactedSegments(sparkContext: SparkContext, + def mergeIndexFilesForCompactedSegments(sparkSession: SparkSession, carbonTable: CarbonTable, mergedLoads: util.List[String]): Unit = { // get only the valid segments of the table @@ -182,7 +182,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging { val validMergedSegIds = validSegments .filter { seg => mergedSegmentIds.contains(seg.getSegmentNo) }.map(_.getSegmentNo) if (null != validMergedSegIds && !validMergedSegIds.isEmpty) { - CommonUtil.mergeIndexFiles(sparkContext, + CommonUtil.mergeIndexFiles(sparkSession, validMergedSegIds, segmentFileNameMap, carbonTable.getTablePath, diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala index 6c74ad2ba6b..7cf8c1e8ea6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala @@ -18,15 +18,13 @@ package org.apache.spark.sql.execution.command.management import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession} -import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan} import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand} import org.apache.spark.storage.StorageLevel import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.spark.util.CarbonSparkUtil +import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} case class CarbonInsertIntoCommand( relation: CarbonDatasourceHadoopRelation, @@ -45,6 +43,9 @@ case class CarbonInsertIntoCommand( case other => false } isDefined } + + ThreadLocalSessionInfo + .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf()) val isPersistEnabledUserValue = CarbonProperties.getInstance .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED, CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 460d7e6e58f..516f9af0f42 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command.management import java.text.SimpleDateFormat import java.util -import java.util.{List, UUID} +import java.util.UUID import scala.collection.JavaConverters._ import scala.collection.mutable @@ -33,7 +33,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Expression, GenericInternalRow, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort} import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY @@ -59,16 +59,16 @@ import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICar import org.apache.carbondata.core.metadata.SegmentFileStore import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} -import org.apache.carbondata.core.metadata.schema.table.column.{CarbonDimension, ColumnSchema} +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil, ObjectSerializationUtil} +import org.apache.carbondata.core.util._ import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.events.exception.PreEventException import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.TableProcessingOperations -import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} +import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} import org.apache.carbondata.processing.loading.exception.NoRetryException import org.apache.carbondata.processing.loading.model.{CarbonLoadModelBuilder, LoadOption} import org.apache.carbondata.processing.loading.model.CarbonLoadModel @@ -77,8 +77,8 @@ import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataPro import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer import org.apache.carbondata.spark.load.{CsvRDDHelper, DataLoadProcessorStepOnSpark} -import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory -import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil} +import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, SerializableConfiguration} +import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil} case class CarbonLoadDataCommand( databaseNameOp: Option[String], @@ -977,8 +977,11 @@ case class CarbonLoadDataCommand( array } } - val finalRDD = convertRDD.mapPartitionsWithIndex {case(index, rows) => + val conf = sparkSession.sparkContext + .broadcast(new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())) + val finalRDD = convertRDD.mapPartitionsWithIndex { case(index, rows) => DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl) + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) DataLoadProcessorStepOnSpark.inputAndconvertFunc( rows, index, diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala index 863324337f3..b77632d40eb 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala @@ -41,13 +41,15 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum} import org.apache.carbondata.core.mutate.data.RowCountDetailsVO import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager, SegmentUpdateStatusManager} -import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.{CarbonUtil, ThreadLocalSessionInfo} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.exception.MultipleMatchingException import org.apache.carbondata.processing.loading.FailureCauses import org.apache.carbondata.spark.DeleteDelataResultImpl +import org.apache.carbondata.spark.rdd.SerializableConfiguration object DeleteExecution { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) @@ -118,17 +120,18 @@ object DeleteExecution { blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq, keyRdd.partitions.length) + val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(sparkSession + .sessionState.newHadoopConf())) + val rdd = rowContRdd.join(keyRdd) res = rdd.mapPartitionsWithIndex( (index: Int, records: Iterator[((String), (RowCountDetailsVO, Iterable[Row]))]) => Iterator[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))]] { - + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) var result = List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))]() while (records.hasNext) { val ((key), (rowCountDetailsVO, groupedRows)) = records.next val segmentId = key.substring(0, key.indexOf(CarbonCommonConstants.FILE_SEPARATOR)) - val loadDetail = - metadataDetails.find(_.getLoadName.equals(segmentId)).get result = result ++ deleteDeltaFunc(index, key, @@ -138,8 +141,7 @@ object DeleteExecution { isStandardTable) } result - } - ).collect() + }).collect() // if no loads are present then no need to do anything. if (res.flatten.isEmpty) { @@ -328,7 +330,7 @@ object DeleteExecution { private def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) : (CarbonTableInputFormat[Array[Object]], Job) = { val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]() - val jobConf: JobConf = new JobConf(new Configuration) + val jobConf: JobConf = new JobConf(FileFactory.getConfiguration) val job: Job = new Job(jobConf) FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath)) (carbonInputFormat, job) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala index 8c88d0ece0c..66066ed9241 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala @@ -32,7 +32,10 @@ import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager +import org.apache.carbondata.core.util.{ThreadLocalSessionInfo} +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType} +import org.apache.carbondata.spark.rdd.SerializableConfiguration object HorizontalCompaction { @@ -188,8 +191,11 @@ object HorizontalCompaction { val timestamp = factTimeStamp val updateStatusDetails = segmentUpdateStatusManager.getUpdateStatusDetails + val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(sparkSession + .sessionState.newHadoopConf())) val result = rdd1.mapPartitions(iter => new Iterator[Seq[CarbonDataMergerUtilResult]] { + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) override def hasNext: Boolean = iter.hasNext override def next(): Seq[CarbonDataMergerUtilResult] = { diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala index 45cacfaa38f..1e987b0b323 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala @@ -173,7 +173,7 @@ case class CarbonAlterTableDropHivePartitionCommand( val segments = new SegmentStatusManager(table.getAbsoluteTableIdentifier) .getValidAndInvalidSegments.getValidSegments // First drop the partitions from partition mapper files of each segment - val tuples = new CarbonDropPartitionRDD(sparkSession.sparkContext, + val tuples = new CarbonDropPartitionRDD(sparkSession, table.getTablePath, segments.asScala, carbonPartitionsTobeDropped, diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala index 7e11170a2b6..22ff5c42397 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala @@ -82,7 +82,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand( carbonTable.getAbsoluteTableIdentifier, sparkSession.sparkContext).process // generate dictionary files for the newly added columns - new AlterTableAddColumnRDD(sparkSession.sparkContext, + new AlterTableAddColumnRDD(sparkSession, newCols, carbonTable.getAbsoluteTableIdentifier).collect() timeStamp = System.currentTimeMillis @@ -110,7 +110,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand( LOGGER.error(e, "Alter table add columns failed") if (newCols.nonEmpty) { LOGGER.info("Cleaning up the dictionary files as alter table add operation failed") - new AlterTableDropColumnRDD(sparkSession.sparkContext, + new AlterTableDropColumnRDD(sparkSession, newCols, carbonTable.getAbsoluteTableIdentifier).collect() AlterTableUtil.revertAddColumnChanges(dbName, tableName, timeStamp)(sparkSession) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala index 23dbf9ed768..1dbe28c866e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala @@ -149,7 +149,7 @@ private[sql] case class CarbonAlterTableDropColumnCommand( sparkSession.catalog.refreshTable(tableIdentifier.quotedString) // TODO: 1. add check for deletion of index tables // delete dictionary files for dictionary column and clear dictionary cache from memory - new AlterTableDropColumnRDD(sparkSession.sparkContext, + new AlterTableDropColumnRDD(sparkSession, dictionaryColumns, carbonTable.getAbsoluteTableIdentifier).collect() diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala index c403d522bd5..1beda112f1d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala @@ -32,7 +32,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, ThreadLocalSessionInfo} import org.apache.carbondata.events.{CreateTablePostExecutionEvent, CreateTablePreExecutionEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.spark.util.CarbonSparkUtil @@ -49,7 +49,9 @@ case class CarbonCreateTableCommand( val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val tableName = tableInfo.getFactTable.getTableName var databaseOpt : Option[String] = None - if(tableInfo.getDatabaseName != null) { + ThreadLocalSessionInfo + .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf()) + if (tableInfo.getDatabaseName != null) { databaseOpt = Some(tableInfo.getDatabaseName) } val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index 94b988c1357..f4fb90af46c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -35,7 +35,7 @@ import org.apache.spark.util.{CarbonReflectionUtils, FileUtils} import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} -import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} /** * Carbon strategies for ddl commands @@ -91,6 +91,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { case e: NoSuchDatabaseException => CarbonProperties.getStorePath } + ThreadLocalSessionInfo + .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf()) FileUtils.createDatabaseDirectory(dbName, dbLocation, sparkSession.sparkContext) ExecutedCommandExec(createDb) :: Nil case drop@DropDatabaseCommand(dbName, ifExists, isCascade) => diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala index 80f781e45d7..b6667dfb05b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonProperties /** @@ -39,7 +40,7 @@ object TableLoader { def extractOptions(propertiesFile: String): immutable.Map[String, String] = { val props = new Properties val path = new Path(propertiesFile) - val fs = path.getFileSystem(new Configuration()) + val fs = path.getFileSystem(FileFactory.getConfiguration) props.load(fs.open(path)) val elments = props.entrySet().iterator() val map = new mutable.HashMap[String, String]() diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java index b0711ba5fdb..8e68ef3ed26 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java @@ -50,6 +50,8 @@ import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeConverter; +import org.apache.hadoop.conf.Configuration; + /** * Executor class for executing the query on the selected segments to be merged. * This will fire a select * query and get the raw result. @@ -103,7 +105,8 @@ public CarbonCompactionExecutor(Map segmentMapping, * * @return List of Carbon iterators */ - public List processTableBlocks() throws QueryExecutionException, IOException { + public List processTableBlocks(Configuration configuration) throws + QueryExecutionException, IOException { List resultList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); List list = null; @@ -131,7 +134,8 @@ public List processTableBlocks() throws QueryExecutionExcepti .size()); queryModel.setTableBlockInfos(list); resultList.add( - new RawResultIterator(executeBlockList(list, segmentId, task), sourceSegProperties, + new RawResultIterator(executeBlockList(list, segmentId, task, configuration), + sourceSegProperties, destinationSegProperties)); } } @@ -174,14 +178,14 @@ private SegmentProperties getSourceSegmentProperties(List listMe * @return */ private CarbonIterator executeBlockList(List blockList, - String segmentId, String taskId) + String segmentId, String taskId, Configuration configuration) throws QueryExecutionException, IOException { queryModel.setTableBlockInfos(blockList); QueryStatisticsRecorder executorRecorder = CarbonTimeStatisticsFactory .createExecutorRecorder(queryModel.getQueryId() + "_" + segmentId + "_" + taskId); queryStatisticsRecorders.add(executorRecorder); queryModel.setStatisticsRecorder(executorRecorder); - QueryExecutor queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); + QueryExecutor queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel, configuration); queryExecutorList.add(queryExecutor); return queryExecutor.execute(queryModel); } diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java index 01db4f6fb35..dd5969f7fe4 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java @@ -35,6 +35,8 @@ import org.apache.carbondata.core.scan.result.RowBatch; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.hadoop.conf.Configuration; + public abstract class AbstractCarbonQueryExecutor { private static final LogService LOGGER = @@ -50,10 +52,11 @@ public abstract class AbstractCarbonQueryExecutor { * @param blockList * @return */ - CarbonIterator executeBlockList(List blockList) + CarbonIterator executeBlockList(List blockList, + Configuration configuration) throws QueryExecutionException, IOException { queryModel.setTableBlockInfos(blockList); - this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); + this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel, configuration); return queryExecutor.execute(queryModel); } diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java index daabd24808e..d32757c72c7 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java @@ -34,6 +34,8 @@ import org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResultIterator; import org.apache.carbondata.core.util.DataTypeConverter; +import org.apache.hadoop.conf.Configuration; + /** * Used to read carbon blocks when add/split partition */ @@ -48,7 +50,7 @@ public CarbonSplitExecutor(Map segmentMapping, CarbonTabl } public List processDataBlocks( - String segmentId, DataTypeConverter converter) + String segmentId, DataTypeConverter converter, Configuration configuration) throws QueryExecutionException, IOException { List list = null; queryModel = new QueryModelBuilder(carbonTable) @@ -64,7 +66,7 @@ public List processDataBlocks( list = taskBlockInfo.getTableBlockInfoList(task); LOGGER.info("for task -" + task + "-block size is -" + list.size()); queryModel.setTableBlockInfos(list); - resultList.add(new PartitionSpliterRawResultIterator(executeBlockList(list))); + resultList.add(new PartitionSpliterRawResultIterator(executeBlockList(list, configuration))); } return resultList; } diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java index e3dabb1ebd1..4859dd23e3d 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java @@ -29,6 +29,8 @@ import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.util.CarbonSessionInfo; +import org.apache.carbondata.core.util.ThreadLocalSessionInfo; import org.apache.carbondata.hadoop.api.CarbonFileInputFormat; import org.apache.hadoop.conf.Configuration; @@ -60,6 +62,7 @@ public class CarbonReaderBuilder { CarbonReaderBuilder(String tablePath, String tableName) { this.tablePath = tablePath; this.tableName = tableName; + ThreadLocalSessionInfo.setCarbonSessionInfo(new CarbonSessionInfo()); } diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index d41e3d0a8e0..065b4a91fd9 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -44,6 +44,8 @@ import org.apache.carbondata.core.metadata.schema.table.TableSchema; import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.util.CarbonSessionInfo; +import org.apache.carbondata.core.util.ThreadLocalSessionInfo; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.ThriftWriter; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; @@ -70,6 +72,10 @@ public class CarbonWriterBuilder { private int localDictionaryThreshold; private boolean isLocalDictionaryEnabled; + public CarbonWriterBuilder() { + ThreadLocalSessionInfo.setCarbonSessionInfo(new CarbonSessionInfo()); + } + /** * Sets the output path of the writer builder * @param path is the absolute path where output files are written diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java index 73742b05528..b6e7ad57e67 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java @@ -23,6 +23,7 @@ import java.util.UUID; import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; @@ -47,7 +48,7 @@ private ObjectArrayWritable writable; JsonCarbonWriter(CarbonLoadModel loadModel) throws IOException { - Configuration OutputHadoopConf = new Configuration(); + Configuration OutputHadoopConf = FileFactory.getConfiguration(); CarbonTableOutputFormat.setLoadModel(OutputHadoopConf, loadModel); CarbonTableOutputFormat outputFormat = new CarbonTableOutputFormat(); JobID jobId = new JobID(UUID.randomUUID().toString(), 0); diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java index b93d80fe7d2..30aa4153fc3 100644 --- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java +++ b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java @@ -54,6 +54,7 @@ import org.apache.carbondata.hadoop.CarbonRecordReader; import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport; +import org.apache.hadoop.conf.Configuration; import org.apache.spark.search.SearchRequest; import org.apache.spark.search.SearchResult; import org.apache.spark.search.ShutdownRequest; @@ -135,7 +136,7 @@ private List handleRequest(SearchRequest request) // In search mode, reader will read multiple blocks by using a thread pool CarbonRecordReader reader = - new CarbonRecordReader<>(queryModel, new CarbonRowReadSupport()); + new CarbonRecordReader<>(queryModel, new CarbonRowReadSupport(), new Configuration()); // read all rows by the reader List rows = new LinkedList<>();