Skip to content

Commit

Permalink
Merge ad6f99d into 706e8d3
Browse files Browse the repository at this point in the history
  • Loading branch information
KanakaKumar committed May 29, 2019
2 parents 706e8d3 + ad6f99d commit 3143275
Show file tree
Hide file tree
Showing 12 changed files with 247 additions and 64 deletions.
Expand Up @@ -1600,6 +1600,11 @@ private CarbonCommonConstants() {
*/
public static final String S3_SECRET_KEY = "fs.s3.awsSecretAccessKey";

/**
* Configuration Key for custom file provider
*/
public static final String CUSTOM_FILE_PROVIDER = "carbon.fs.custom.file.provider";

/**
* FS_DEFAULT_FS
*/
Expand Down
Expand Up @@ -404,8 +404,8 @@ public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fi
return new DataOutputStream(new BufferedOutputStream(outputStream));
}

@Override public boolean isFileExist(String filePath, FileFactory.FileType fileType,
boolean performFileCheck) throws IOException {
@Override public boolean isFileExist(String filePath, boolean performFileCheck)
throws IOException {
filePath = filePath.replace("\\", "/");
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
Expand All @@ -416,7 +416,7 @@ public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fi
}
}

@Override public boolean isFileExist(String filePath, FileFactory.FileType fileType)
@Override public boolean isFileExist(String filePath)
throws IOException {
filePath = filePath.replace("\\", "/");
Path path = new Path(filePath);
Expand Down
Expand Up @@ -139,10 +139,10 @@ DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType, int bufferSize,
String compressor) throws IOException;

boolean isFileExist(String filePath, FileFactory.FileType fileType, boolean performFileCheck)
boolean isFileExist(String filePath, boolean performFileCheck)
throws IOException;

boolean isFileExist(String filePath, FileFactory.FileType fileType) throws IOException;
boolean isFileExist(String filePath) throws IOException;

boolean createNewFile(String filePath, FileFactory.FileType fileType) throws IOException;

Expand Down
Expand Up @@ -410,10 +410,10 @@ public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fi
}
}

@Override public boolean isFileExist(String filePath, FileFactory.FileType fileType,
boolean performFileCheck) throws IOException {
@Override public boolean isFileExist(String filePath, boolean performFileCheck)
throws IOException {
filePath = filePath.replace("\\", "/");
filePath = FileFactory.getUpdatedFilePath(filePath, fileType);
filePath = FileFactory.getUpdatedFilePath(filePath);
File defaultFile = new File(filePath);

if (performFileCheck) {
Expand All @@ -423,10 +423,10 @@ public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fi
}
}

@Override public boolean isFileExist(String filePath, FileFactory.FileType fileType)
@Override public boolean isFileExist(String filePath)
throws IOException {
filePath = filePath.replace("\\", "/");
filePath = FileFactory.getUpdatedFilePath(filePath, fileType);
filePath = FileFactory.getUpdatedFilePath(filePath);
File defaultFile = new File(filePath);
return defaultFile.exists();
}
Expand Down
Expand Up @@ -17,50 +17,80 @@

package org.apache.carbondata.core.datastore.impl;

import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.filesystem.AlluxioCarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.HDFSCarbonFile;
import org.apache.carbondata.core.datastore.filesystem.LocalCarbonFile;
import org.apache.carbondata.core.datastore.filesystem.S3CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.ViewFSCarbonFile;
import org.apache.carbondata.core.util.CarbonProperties;

import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;

/**
* FileType provider to create CarbonFile specific to the file system where the path belongs to.
*/
public class DefaultFileTypeProvider implements FileTypeInterface {

public FileReader getFileHolder(FileFactory.FileType fileType, Configuration configuration) {
switch (fileType) {
case LOCAL:
return new FileReaderImpl();
case HDFS:
case ALLUXIO:
case VIEWFS:
case S3:
return new DFSFileReaderImpl(configuration);
default:
return new FileReaderImpl();
private static final Logger LOGGER =
LogServiceFactory.getLogService(DefaultFileTypeProvider.class.getName());

/**
* Custom file type provider for supporting non default file systems.
*/
protected FileTypeInterface customFileTypeProvider = null;

protected boolean customFileTypeProviderInitialized = false;

public DefaultFileTypeProvider() {
}

/**
* This method is required apart from Constructor to handle the below circular dependency.
* CarbonProperties-->FileFactory-->DefaultTypeProvider-->CarbonProperties
*/
private void initializeCustomFileprovider() {
if (!customFileTypeProviderInitialized) {
customFileTypeProviderInitialized = true;
String customFileProvider =
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CUSTOM_FILE_PROVIDER);
if (customFileProvider != null && !customFileProvider.trim().isEmpty()) {
try {
customFileTypeProvider =
(FileTypeInterface) Class.forName(customFileProvider).newInstance();
} catch (Exception e) {
LOGGER.error("Unable load configured FileTypeInterface class. Ignored.", e);
}
}
}
}

public CarbonFile getCarbonFile(String path, FileFactory.FileType fileType) {
switch (fileType) {
case LOCAL:
return new LocalCarbonFile(FileFactory.getUpdatedFilePath(path, fileType));
case HDFS:
return new HDFSCarbonFile(path);
case S3:
return new S3CarbonFile(path);
case ALLUXIO:
return new AlluxioCarbonFile(path);
case VIEWFS:
return new ViewFSCarbonFile(path);
default:
return new LocalCarbonFile(FileFactory.getUpdatedFilePath(path, fileType));
/**
* Delegate to the custom file provider to check if the path is supported or not.
* Note this function do not check the default supported file systems as #getCarbonFile expects
* this method output is from customFileTypeProvider.
*
* @param path path of the file
* @return true if supported by the custom
*/
@Override public boolean isPathSupported(String path) {
initializeCustomFileprovider();
if (customFileTypeProvider != null) {
return customFileTypeProvider.isPathSupported(path);
}
return false;
}

public CarbonFile getCarbonFile(String path, FileFactory.FileType fileType, Configuration conf) {
public CarbonFile getCarbonFile(String path, Configuration conf) {
// Handle the custom file type first
if (isPathSupported(path)) {
return customFileTypeProvider.getCarbonFile(path, conf);
}

FileFactory.FileType fileType = FileFactory.getFileType(path);
switch (fileType) {
case LOCAL:
return new LocalCarbonFile(FileFactory.getUpdatedFilePath(path, fileType));
Expand Down
Expand Up @@ -52,10 +52,12 @@ public final class FileFactory {
configuration.addResource(new Path("../core-default.xml"));
}

private static FileTypeInterface fileFileTypeInterface = new DefaultFileTypeProvider();
public static void setFileTypeInterface(FileTypeInterface fileTypeInterface) {
private static DefaultFileTypeProvider fileFileTypeInterface = new DefaultFileTypeProvider();

public static void setFileTypeInterface(DefaultFileTypeProvider fileTypeInterface) {
fileFileTypeInterface = fileTypeInterface;
}

private FileFactory() {

}
Expand All @@ -73,11 +75,22 @@ public static Configuration getConfiguration() {
}

public static FileReader getFileHolder(FileType fileType) {
return fileFileTypeInterface.getFileHolder(fileType, getConfiguration());
return getFileHolder(fileType, getConfiguration());
}

public static FileReader getFileHolder(FileType fileType, Configuration configuration) {
return fileFileTypeInterface.getFileHolder(fileType, configuration);
public static FileReader getFileHolder(FileFactory.FileType fileType,
Configuration configuration) {
switch (fileType) {
case LOCAL:
return new FileReaderImpl();
case HDFS:
case ALLUXIO:
case VIEWFS:
case S3:
return new DFSFileReaderImpl(configuration);
default:
return new FileReaderImpl();
}
}

public static FileType getFileType(String path) {
Expand All @@ -89,6 +102,17 @@ public static FileType getFileType(String path) {
if (fileType != null) {
return fileType;
}

// If custom file type is configured,
if (fileFileTypeInterface.isPathSupported(path)) {
return FileType.CUSTOM;
}

// If its unsupported file system, throw error instead of heading to wrong behavior,
if (path.contains("://") && !path.startsWith("file://")) {
throw new IllegalArgumentException("Path belongs to unsupported file system " + path);
}

return FileType.LOCAL;
}

Expand Down Expand Up @@ -124,14 +148,15 @@ private static FileType getFileTypeWithActualPath(String path) {
}

public static CarbonFile getCarbonFile(String path) {
return fileFileTypeInterface.getCarbonFile(path, getFileType(path));
return fileFileTypeInterface.getCarbonFile(path, getConfiguration());
}
public static CarbonFile getCarbonFile(String path, FileType fileType) {
return fileFileTypeInterface.getCarbonFile(path, fileType);
//TODO ignoring this fileType now to avoid refactoring. Remove the unused argument later.
return fileFileTypeInterface.getCarbonFile(path, getConfiguration());
}
public static CarbonFile getCarbonFile(String path,
Configuration hadoopConf) {
return fileFileTypeInterface.getCarbonFile(path, getFileType(path), hadoopConf);
return fileFileTypeInterface.getCarbonFile(path, hadoopConf);
}

public static DataInputStream getDataInputStream(String path, FileType fileType)
Expand Down Expand Up @@ -229,12 +254,11 @@ public static DataOutputStream getDataOutputStream(String path, FileType fileTyp
* not if the performFileCheck is true
*
* @param filePath - Path
* @param fileType - FileType Local/HDFS
* @param performFileCheck - Provide false for folders, true for files and
*/
public static boolean isFileExist(String filePath, FileType fileType, boolean performFileCheck)
public static boolean isFileExist(String filePath, boolean performFileCheck)
throws IOException {
return getCarbonFile(filePath).isFileExist(filePath, fileType, performFileCheck);
return getCarbonFile(filePath).isFileExist(filePath, performFileCheck);
}

/**
Expand All @@ -244,7 +268,7 @@ public static boolean isFileExist(String filePath, FileType fileType, boolean pe
* @param fileType - FileType Local/HDFS
*/
public static boolean isFileExist(String filePath, FileType fileType) throws IOException {
return getCarbonFile(filePath).isFileExist(filePath, fileType);
return getCarbonFile(filePath).isFileExist(filePath);
}

/**
Expand Down Expand Up @@ -347,6 +371,7 @@ public static void truncateFile(String path, FileType fileType, long newSize) th
case HDFS:
case ALLUXIO:
case VIEWFS:
case CUSTOM:
case S3:
// if hadoop version >= 2.7, it can call method 'FileSystem.truncate' to truncate file,
// this method was new in hadoop 2.7, otherwise use CarbonFile.truncate to do this.
Expand Down Expand Up @@ -392,7 +417,7 @@ public static boolean createNewLockFile(String filePath, FileType fileType) thro
}

public enum FileType {
LOCAL, HDFS, ALLUXIO, VIEWFS, S3
LOCAL, HDFS, ALLUXIO, VIEWFS, S3, CUSTOM
}

/**
Expand All @@ -413,6 +438,7 @@ public static String addSchemeIfNotExists(String filePath) {
case ALLUXIO:
case VIEWFS:
case S3:
case CUSTOM:
default:
return filePath;
}
Expand All @@ -432,6 +458,7 @@ public static String getUpdatedFilePath(String filePath, FileType fileType) {
case HDFS:
case VIEWFS:
case S3:
case CUSTOM:
return filePath;
case ALLUXIO:
return StringUtils.startsWith(filePath, "alluxio") ? filePath : "alluxio:///" + filePath;
Expand Down Expand Up @@ -483,6 +510,7 @@ public static long getDirectorySize(String filePath) throws IOException {
case ALLUXIO:
case VIEWFS:
case S3:
case CUSTOM:
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(getConfiguration());
return fs.getContentSummary(path).getLength();
Expand Down Expand Up @@ -524,6 +552,7 @@ public static void createDirectoryAndSetPermission(String directoryPath, FsPermi
case HDFS:
case ALLUXIO:
case VIEWFS:
case CUSTOM:
try {
Path path = new Path(directoryPath);
FileSystem fs = path.getFileSystem(getConfiguration());
Expand Down
Expand Up @@ -17,15 +17,30 @@

package org.apache.carbondata.core.datastore.impl;

import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;

import org.apache.hadoop.conf.Configuration;

/**
* Interface to create CarbonFile Instance specific to the FileSystem where the patch belongs.
*/
public interface FileTypeInterface {

FileReader getFileHolder(FileFactory.FileType fileType, Configuration configuration);
CarbonFile getCarbonFile(String path, FileFactory.FileType fileType);
CarbonFile getCarbonFile(String path, FileFactory.FileType fileType, Configuration configuration);
/**
* Return the correct CarbonFile instance.
*
* @param path path of the file
* @param configuration configuration
* @return CarbonFile instance
*/
public CarbonFile getCarbonFile(String path, Configuration configuration);

/**
* Check if the FileSystem mapped with the given path is supported or not.
*
* @param path path of the file
* @return true if supported, fasle if not supported
*/
public boolean isPathSupported(String path);
}

Expand Up @@ -39,10 +39,7 @@ public class SchemaReader {
public static CarbonTable readCarbonTableFromStore(AbsoluteTableIdentifier identifier)
throws IOException {
String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
if (FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) ||
FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) ||
FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.S3) ||
FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)) {
if (FileFactory.isFileExist(schemaFilePath)) {
String tableName = identifier.getCarbonTableIdentifier().getTableName();

org.apache.carbondata.format.TableInfo tableInfo =
Expand Down
Expand Up @@ -2566,6 +2566,7 @@ private static HashMap<String, Long> getDataSizeAndIndexSize(String tablePath,
case ALLUXIO:
case VIEWFS:
case S3:
case CUSTOM:
Path path = new Path(segmentPath);
FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
if (fs.exists(path)) {
Expand Down

0 comments on commit 3143275

Please sign in to comment.