Skip to content

Commit

Permalink
[CARBONDATA-2844] [CARBONDATA-2865] Pass SK/AK to executor by seriali…
Browse files Browse the repository at this point in the history
…zing hadoop configuration from driver.

add SK/AK to thread local so that on each query new SK/AK can be passed to FileFactory
Refactor FileFactory to accept configuration from thread local.
Fixed compatibility issue from 1.3.x to 1.5.x [CARBONDATA-2865].

This closes #2623
  • Loading branch information
kunal642 authored and ravipesala committed Aug 28, 2018
1 parent 1fb1f19 commit 2a9604c
Show file tree
Hide file tree
Showing 80 changed files with 428 additions and 314 deletions.
Expand Up @@ -25,6 +25,7 @@
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.FileReader;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -37,7 +38,10 @@ public class DFSFileReaderImpl implements FileReader {

private boolean readPageByPage;

public DFSFileReaderImpl() {
private Configuration configuration;

public DFSFileReaderImpl(Configuration configuration) {
this.configuration = configuration;
this.fileNameAndStreamCache =
new HashMap<String, FSDataInputStream>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
}
Expand All @@ -60,7 +64,7 @@ private FSDataInputStream updateCache(String filePath) throws IOException {
FSDataInputStream fileChannel = fileNameAndStreamCache.get(filePath);
if (null == fileChannel) {
Path pt = new Path(filePath);
FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration());
FileSystem fs = pt.getFileSystem(configuration);
fileChannel = fs.open(pt);
fileNameAndStreamCache.put(filePath, fileChannel);
}
Expand Down
Expand Up @@ -29,15 +29,15 @@

public class DefaultFileTypeProvider implements FileTypeInterface {

public FileReader getFileHolder(FileFactory.FileType fileType) {
public FileReader getFileHolder(FileFactory.FileType fileType, Configuration configuration) {
switch (fileType) {
case LOCAL:
return new FileReaderImpl();
case HDFS:
case ALLUXIO:
case VIEWFS:
case S3:
return new DFSFileReaderImpl();
return new DFSFileReaderImpl(configuration);
default:
return new FileReaderImpl();
}
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.util.ThreadLocalSessionInfo;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -59,11 +60,23 @@ private FileFactory() {
}

public static Configuration getConfiguration() {
return configuration;
Configuration conf;
Object confObject = ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo()
.getNonSerializableExtraInfo().get("carbonConf");
if (confObject == null) {
conf = configuration;
} else {
conf = (Configuration) confObject;
}
return conf;
}

public static FileReader getFileHolder(FileType fileType) {
return fileFileTypeInterface.getFileHolder(fileType);
return fileFileTypeInterface.getFileHolder(fileType, getConfiguration());
}

public static FileReader getFileHolder(FileType fileType, Configuration configuration) {
return fileFileTypeInterface.getFileHolder(fileType, configuration);
}

public static FileType getFileType(String path) {
Expand Down Expand Up @@ -100,7 +113,7 @@ public static DataInputStream getDataInputStream(String path, FileType fileType)

public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize)
throws IOException {
return getDataInputStream(path, fileType, bufferSize, configuration);
return getDataInputStream(path, fileType, bufferSize, getConfiguration());
}
public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize,
Configuration configuration) throws IOException {
Expand Down Expand Up @@ -306,7 +319,7 @@ public static void truncateFile(String path, FileType fileType, long newSize) th
// this method was new in hadoop 2.7, otherwise use CarbonFile.truncate to do this.
try {
Path pt = new Path(path);
FileSystem fs = pt.getFileSystem(configuration);
FileSystem fs = pt.getFileSystem(getConfiguration());
Method truncateMethod = fs.getClass().getDeclaredMethod("truncate",
new Class[]{Path.class, long.class});
truncateMethod.invoke(fs, new Object[]{pt, newSize});
Expand Down Expand Up @@ -414,7 +427,7 @@ public static long getDirectorySize(String filePath) throws IOException {
case VIEWFS:
case S3:
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(configuration);
FileSystem fs = path.getFileSystem(getConfiguration());
return fs.getContentSummary(path).getLength();
case LOCAL:
default:
Expand Down Expand Up @@ -442,7 +455,7 @@ public static Path getPath(String filePath) {
* @throws IOException
*/
public static FileSystem getFileSystem(Path path) throws IOException {
return path.getFileSystem(configuration);
return path.getFileSystem(getConfiguration());
}


Expand All @@ -455,7 +468,7 @@ public static void createDirectoryAndSetPermission(String directoryPath, FsPermi
case VIEWFS:
try {
Path path = new Path(directoryPath);
FileSystem fs = path.getFileSystem(FileFactory.configuration);
FileSystem fs = path.getFileSystem(getConfiguration());
if (!fs.exists(path)) {
fs.mkdirs(path);
fs.setPermission(path, permission);
Expand Down
Expand Up @@ -24,7 +24,7 @@

public interface FileTypeInterface {

FileReader getFileHolder(FileFactory.FileType fileType);
FileReader getFileHolder(FileFactory.FileType fileType, Configuration configuration);
CarbonFile getCarbonFile(String path, FileFactory.FileType fileType);
CarbonFile getCarbonFile(String path, FileFactory.FileType fileType, Configuration configuration);
}
Expand Down
Expand Up @@ -23,24 +23,26 @@
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.util.CarbonProperties;

import org.apache.hadoop.conf.Configuration;

/**
* Factory class to get the query executor from RDD
* This will return the executor based on query type
*/
public class QueryExecutorFactory {

public static QueryExecutor getQueryExecutor(QueryModel queryModel) {
public static QueryExecutor getQueryExecutor(QueryModel queryModel, Configuration configuration) {
if (CarbonProperties.isSearchModeEnabled()) {
if (queryModel.isVectorReader()) {
return new SearchModeVectorDetailQueryExecutor();
return new SearchModeVectorDetailQueryExecutor(configuration);
} else {
return new SearchModeDetailQueryExecutor();
return new SearchModeDetailQueryExecutor(configuration);
}
} else {
if (queryModel.isVectorReader()) {
return new VectorDetailQueryExecutor();
return new VectorDetailQueryExecutor(configuration);
} else {
return new DetailQueryExecutor();
return new DetailQueryExecutor(configuration);
}
}
}
Expand Down
Expand Up @@ -68,10 +68,12 @@
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
import org.apache.carbondata.core.util.path.CarbonTablePath;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;

/**
* This class provides a skeletal implementation of the {@link QueryExecutor}
Expand All @@ -96,7 +98,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
*/
protected CarbonIterator queryIterator;

public AbstractQueryExecutor() {
public AbstractQueryExecutor(Configuration configuration) {
ThreadLocalSessionInfo.setConfigurationToCurrentThread(configuration);
queryProperties = new QueryExecutorProperties();
}

Expand Down
Expand Up @@ -27,13 +27,19 @@
import org.apache.carbondata.core.scan.result.RowBatch;
import org.apache.carbondata.core.scan.result.iterator.DetailQueryResultIterator;

import org.apache.hadoop.conf.Configuration;

/**
* Below class will be used to execute the detail query
* For executing the detail query it will pass all the block execution
* info to detail query result iterator and iterator will be returned
*/
public class DetailQueryExecutor extends AbstractQueryExecutor<RowBatch> {

public DetailQueryExecutor(Configuration configuration) {
super(configuration);
}

@Override
public CarbonIterator<RowBatch> execute(QueryModel queryModel)
throws QueryExecutionException, IOException {
Expand Down
Expand Up @@ -31,13 +31,15 @@
import org.apache.carbondata.core.scan.result.iterator.SearchModeResultIterator;
import org.apache.carbondata.core.util.CarbonProperties;

import org.apache.hadoop.conf.Configuration;

public class SearchModeDetailQueryExecutor extends AbstractQueryExecutor<Object> {
private static final LogService LOGGER =
LogServiceFactory.getLogService(SearchModeDetailQueryExecutor.class.getName());
private static ExecutorService executorService = null;

public SearchModeDetailQueryExecutor() {
public SearchModeDetailQueryExecutor(Configuration configuration) {
super(configuration);
if (executorService == null) {
initThreadPool();
}
Expand Down
Expand Up @@ -32,6 +32,8 @@

import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD;

import org.apache.hadoop.conf.Configuration;

/**
* Below class will be used to execute the detail query and returns columnar vectors.
*/
Expand All @@ -40,7 +42,8 @@ public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<O
LogServiceFactory.getLogService(SearchModeVectorDetailQueryExecutor.class.getName());
private static ExecutorService executorService = null;

public SearchModeVectorDetailQueryExecutor() {
public SearchModeVectorDetailQueryExecutor(Configuration configuration) {
super(configuration);
if (executorService == null) {
initThreadPool();
}
Expand Down
Expand Up @@ -26,11 +26,17 @@
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.scan.result.iterator.VectorDetailQueryResultIterator;

import org.apache.hadoop.conf.Configuration;

/**
* Below class will be used to execute the detail query and returns columnar vectors.
*/
public class VectorDetailQueryExecutor extends AbstractQueryExecutor<Object> {

public VectorDetailQueryExecutor(Configuration configuration) {
super(configuration);
}

@Override
public CarbonIterator<Object> execute(QueryModel queryModel)
throws QueryExecutionException, IOException {
Expand Down
Expand Up @@ -23,11 +23,11 @@
import java.io.IOException;
import java.lang.reflect.Field;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
Expand Down Expand Up @@ -94,7 +94,7 @@ public final class CarbonProperties {
/**
* It is purely for testing
*/
private Map<String, String> addedProperty = new HashMap<>();
private Map<String, String> addedProperty = new ConcurrentHashMap<>();

/**
* Private constructor this will call load properties method to load all the
Expand Down Expand Up @@ -407,7 +407,7 @@ private void validateLockType() {
* @param lockTypeConfigured
*/
private void validateAndConfigureLockType(String lockTypeConfigured) {
Configuration configuration = new Configuration(true);
Configuration configuration = FileFactory.getConfiguration();
String defaultFs = configuration.get("fs.defaultFS");
if (null != defaultFs && (defaultFs.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)
|| defaultFs.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) || defaultFs
Expand Down
Expand Up @@ -134,8 +134,6 @@ public final class CarbonUtil {
*/
private static final int CONST_HUNDRED = 100;

private static final Configuration conf = new Configuration(true);

/**
* dfs.bytes-per-checksum
* HDFS checksum length, block size for a file should be exactly divisible
Expand Down Expand Up @@ -662,7 +660,7 @@ public static String delimiterConverter(String delimiter) {
*/
public static String checkAndAppendHDFSUrl(String filePath) {
String currentPath = filePath;
String defaultFsUrl = conf.get(CarbonCommonConstants.FS_DEFAULT_FS);
String defaultFsUrl = FileFactory.getConfiguration().get(CarbonCommonConstants.FS_DEFAULT_FS);
String baseDFSUrl = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_DDL_BASE_HDFS_URL, "");
if (checkIfPrefixExists(filePath)) {
Expand Down Expand Up @@ -699,7 +697,7 @@ public static String checkAndAppendFileSystemURIScheme(String filePath) {
filePath = "/" + filePath;
}
currentPath = filePath;
String defaultFsUrl = conf.get(CarbonCommonConstants.FS_DEFAULT_FS);
String defaultFsUrl = FileFactory.getConfiguration().get(CarbonCommonConstants.FS_DEFAULT_FS);
if (defaultFsUrl == null) {
return currentPath;
}
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.carbondata.common.constants.LoggerAction;
import org.apache.carbondata.common.logging.LogService;
Expand Down Expand Up @@ -57,12 +58,12 @@ public class SessionParams implements Serializable, Cloneable {
private static final long serialVersionUID = -7801994600594915264L;

private Map<String, String> sProps;
private Map<String, String> addedProps;
private ConcurrentHashMap<String, String> addedProps;
// below field to be used when we want the objects to be serialized
private Map<String, Object> extraInfo;
public SessionParams() {
sProps = new HashMap<>();
addedProps = new HashMap<>();
addedProps = new ConcurrentHashMap<>();
extraInfo = new HashMap<>();
}

Expand Down
Expand Up @@ -17,6 +17,8 @@

package org.apache.carbondata.core.util;

import org.apache.hadoop.conf.Configuration;

/**
* This class maintains ThreadLocal session params
*/
Expand All @@ -31,4 +33,22 @@ public static void setCarbonSessionInfo(CarbonSessionInfo carbonSessionInfo) {
public static CarbonSessionInfo getCarbonSessionInfo() {
return threadLocal.get();
}

public static synchronized CarbonSessionInfo getOrCreateCarbonSessionInfo() {
CarbonSessionInfo info = threadLocal.get();
if (info == null || info.getSessionParams() == null) {
info = new CarbonSessionInfo();
info.setSessionParams(new SessionParams());
threadLocal.set(info);
}
return info;
}

public static void setConfigurationToCurrentThread(Configuration configuration) {
getOrCreateCarbonSessionInfo().getNonSerializableExtraInfo().put("carbonConf", configuration);
}

public static void unsetAll() {
threadLocal.remove();
}
}
Expand Up @@ -26,6 +26,7 @@

import mockit.Mock;
import mockit.MockUp;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -45,7 +46,7 @@ public class DFSFileReaderImplUnitTest {
private static File fileWithEmptyContent;

@BeforeClass public static void setup() {
dfsFileHolder = new DFSFileReaderImpl();
dfsFileHolder = new DFSFileReaderImpl(new Configuration());
file = new File("Test.carbondata");
fileWithEmptyContent = new File("TestEXception.carbondata");

Expand Down

0 comments on commit 2a9604c

Please sign in to comment.