Skip to content

Commit

Permalink
[CARBONDATA-3404] Support CarbonFile API through FileTypeInterface to…
Browse files Browse the repository at this point in the history
… use custom FileSystem

Currently CarbonData supports few set of FileSystems like HDFS,S3,VIEWFS schemes.
If user configures table path from different file systems apart from supported, FileFactory takes CarbonLocalFile as default and causes errors.

This PR proposes to support a API for user to extend CarbonFile which override the required methods from AbstractCarbonFile if a specific handling required for operations like renameForce.

This closes #3246
  • Loading branch information
KanakaKumar authored and ravipesala committed Jun 6, 2019
1 parent 6fa7fb4 commit 85f1b9f
Show file tree
Hide file tree
Showing 13 changed files with 282 additions and 85 deletions.
Expand Up @@ -1600,6 +1600,11 @@ private CarbonCommonConstants() {
*/
public static final String S3_SECRET_KEY = "fs.s3.awsSecretAccessKey";

/**
* Configuration Key for custom file provider
*/
public static final String CUSTOM_FILE_PROVIDER = "carbon.fs.custom.file.provider";

/**
* FS_DEFAULT_FS
*/
Expand Down
Expand Up @@ -404,8 +404,8 @@ public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fi
return new DataOutputStream(new BufferedOutputStream(outputStream));
}

@Override public boolean isFileExist(String filePath, FileFactory.FileType fileType,
boolean performFileCheck) throws IOException {
@Override public boolean isFileExist(String filePath, boolean performFileCheck)
throws IOException {
filePath = filePath.replace("\\", "/");
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
Expand All @@ -416,7 +416,7 @@ public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fi
}
}

@Override public boolean isFileExist(String filePath, FileFactory.FileType fileType)
@Override public boolean isFileExist(String filePath)
throws IOException {
filePath = filePath.replace("\\", "/");
Path path = new Path(filePath);
Expand Down
Expand Up @@ -139,10 +139,10 @@ DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType, int bufferSize,
String compressor) throws IOException;

boolean isFileExist(String filePath, FileFactory.FileType fileType, boolean performFileCheck)
boolean isFileExist(String filePath, boolean performFileCheck)
throws IOException;

boolean isFileExist(String filePath, FileFactory.FileType fileType) throws IOException;
boolean isFileExist(String filePath) throws IOException;

boolean createNewFile(String filePath, FileFactory.FileType fileType) throws IOException;

Expand Down
Expand Up @@ -410,10 +410,10 @@ public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fi
}
}

@Override public boolean isFileExist(String filePath, FileFactory.FileType fileType,
boolean performFileCheck) throws IOException {
@Override public boolean isFileExist(String filePath, boolean performFileCheck)
throws IOException {
filePath = filePath.replace("\\", "/");
filePath = FileFactory.getUpdatedFilePath(filePath, fileType);
filePath = FileFactory.getUpdatedFilePath(filePath);
File defaultFile = new File(filePath);

if (performFileCheck) {
Expand All @@ -423,10 +423,10 @@ public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fi
}
}

@Override public boolean isFileExist(String filePath, FileFactory.FileType fileType)
@Override public boolean isFileExist(String filePath)
throws IOException {
filePath = filePath.replace("\\", "/");
filePath = FileFactory.getUpdatedFilePath(filePath, fileType);
filePath = FileFactory.getUpdatedFilePath(filePath);
File defaultFile = new File(filePath);
return defaultFile.exists();
}
Expand Down
Expand Up @@ -17,50 +17,80 @@

package org.apache.carbondata.core.datastore.impl;

import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.filesystem.AlluxioCarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.HDFSCarbonFile;
import org.apache.carbondata.core.datastore.filesystem.LocalCarbonFile;
import org.apache.carbondata.core.datastore.filesystem.S3CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.ViewFSCarbonFile;
import org.apache.carbondata.core.util.CarbonProperties;

import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;

/**
* FileType provider to create CarbonFile specific to the file system where the path belongs to.
*/
public class DefaultFileTypeProvider implements FileTypeInterface {

public FileReader getFileHolder(FileFactory.FileType fileType, Configuration configuration) {
switch (fileType) {
case LOCAL:
return new FileReaderImpl();
case HDFS:
case ALLUXIO:
case VIEWFS:
case S3:
return new DFSFileReaderImpl(configuration);
default:
return new FileReaderImpl();
private static final Logger LOGGER =
LogServiceFactory.getLogService(DefaultFileTypeProvider.class.getName());

/**
* Custom file type provider for supporting non default file systems.
*/
protected FileTypeInterface customFileTypeProvider = null;

protected boolean customFileTypeProviderInitialized = false;

public DefaultFileTypeProvider() {
}

/**
* This method is required apart from Constructor to handle the below circular dependency.
* CarbonProperties-->FileFactory-->DefaultTypeProvider-->CarbonProperties
*/
private void initializeCustomFileprovider() {
if (!customFileTypeProviderInitialized) {
customFileTypeProviderInitialized = true;
String customFileProvider =
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CUSTOM_FILE_PROVIDER);
if (customFileProvider != null && !customFileProvider.trim().isEmpty()) {
try {
customFileTypeProvider =
(FileTypeInterface) Class.forName(customFileProvider).newInstance();
} catch (Exception e) {
LOGGER.error("Unable load configured FileTypeInterface class. Ignored.", e);
}
}
}
}

public CarbonFile getCarbonFile(String path, FileFactory.FileType fileType) {
switch (fileType) {
case LOCAL:
return new LocalCarbonFile(FileFactory.getUpdatedFilePath(path, fileType));
case HDFS:
return new HDFSCarbonFile(path);
case S3:
return new S3CarbonFile(path);
case ALLUXIO:
return new AlluxioCarbonFile(path);
case VIEWFS:
return new ViewFSCarbonFile(path);
default:
return new LocalCarbonFile(FileFactory.getUpdatedFilePath(path, fileType));
/**
* Delegate to the custom file provider to check if the path is supported or not.
* Note this function do not check the default supported file systems as #getCarbonFile expects
* this method output is from customFileTypeProvider.
*
* @param path path of the file
* @return true if supported by the custom
*/
@Override public boolean isPathSupported(String path) {
initializeCustomFileprovider();
if (customFileTypeProvider != null) {
return customFileTypeProvider.isPathSupported(path);
}
return false;
}

public CarbonFile getCarbonFile(String path, FileFactory.FileType fileType, Configuration conf) {
public CarbonFile getCarbonFile(String path, Configuration conf) {
// Handle the custom file type first
if (isPathSupported(path)) {
return customFileTypeProvider.getCarbonFile(path, conf);
}

FileFactory.FileType fileType = FileFactory.getFileType(path);
switch (fileType) {
case LOCAL:
return new LocalCarbonFile(FileFactory.getUpdatedFilePath(path, fileType));
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.channels.FileChannel;
import java.util.Locale;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
Expand Down Expand Up @@ -52,10 +53,12 @@ public final class FileFactory {
configuration.addResource(new Path("../core-default.xml"));
}

private static FileTypeInterface fileFileTypeInterface = new DefaultFileTypeProvider();
public static void setFileTypeInterface(FileTypeInterface fileTypeInterface) {
private static DefaultFileTypeProvider fileFileTypeInterface = new DefaultFileTypeProvider();

public static void setFileTypeInterface(DefaultFileTypeProvider fileTypeInterface) {
fileFileTypeInterface = fileTypeInterface;
}

private FileFactory() {

}
Expand All @@ -73,11 +76,22 @@ public static Configuration getConfiguration() {
}

public static FileReader getFileHolder(FileType fileType) {
return fileFileTypeInterface.getFileHolder(fileType, getConfiguration());
return getFileHolder(fileType, getConfiguration());
}

public static FileReader getFileHolder(FileType fileType, Configuration configuration) {
return fileFileTypeInterface.getFileHolder(fileType, configuration);
public static FileReader getFileHolder(FileFactory.FileType fileType,
Configuration configuration) {
switch (fileType) {
case LOCAL:
return new FileReaderImpl();
case HDFS:
case ALLUXIO:
case VIEWFS:
case S3:
return new DFSFileReaderImpl(configuration);
default:
return new FileReaderImpl();
}
}

public static FileType getFileType(String path) {
Expand All @@ -89,6 +103,17 @@ public static FileType getFileType(String path) {
if (fileType != null) {
return fileType;
}

// If custom file type is configured,
if (fileFileTypeInterface.isPathSupported(path)) {
return FileType.CUSTOM;
}

// If its unsupported file system, throw error instead of heading to wrong behavior,
if (path.contains("://") && !path.startsWith("file://")) {
throw new IllegalArgumentException("Path belongs to unsupported file system " + path);
}

return FileType.LOCAL;
}

Expand Down Expand Up @@ -124,14 +149,15 @@ private static FileType getFileTypeWithActualPath(String path) {
}

public static CarbonFile getCarbonFile(String path) {
return fileFileTypeInterface.getCarbonFile(path, getFileType(path));
return fileFileTypeInterface.getCarbonFile(path, getConfiguration());
}
public static CarbonFile getCarbonFile(String path, FileType fileType) {
return fileFileTypeInterface.getCarbonFile(path, fileType);
//TODO ignoring this fileType now to avoid refactoring. Remove the unused argument later.
return fileFileTypeInterface.getCarbonFile(path, getConfiguration());
}
public static CarbonFile getCarbonFile(String path,
Configuration hadoopConf) {
return fileFileTypeInterface.getCarbonFile(path, getFileType(path), hadoopConf);
return fileFileTypeInterface.getCarbonFile(path, hadoopConf);
}

public static DataInputStream getDataInputStream(String path, FileType fileType)
Expand Down Expand Up @@ -229,12 +255,11 @@ public static DataOutputStream getDataOutputStream(String path, FileType fileTyp
* not if the performFileCheck is true
*
* @param filePath - Path
* @param fileType - FileType Local/HDFS
* @param performFileCheck - Provide false for folders, true for files and
*/
public static boolean isFileExist(String filePath, FileType fileType, boolean performFileCheck)
public static boolean isFileExist(String filePath, boolean performFileCheck)
throws IOException {
return getCarbonFile(filePath).isFileExist(filePath, fileType, performFileCheck);
return getCarbonFile(filePath).isFileExist(filePath, performFileCheck);
}

/**
Expand All @@ -244,7 +269,7 @@ public static boolean isFileExist(String filePath, FileType fileType, boolean pe
* @param fileType - FileType Local/HDFS
*/
public static boolean isFileExist(String filePath, FileType fileType) throws IOException {
return getCarbonFile(filePath).isFileExist(filePath, fileType);
return getCarbonFile(filePath).isFileExist(filePath);
}

/**
Expand Down Expand Up @@ -347,6 +372,7 @@ public static void truncateFile(String path, FileType fileType, long newSize) th
case HDFS:
case ALLUXIO:
case VIEWFS:
case CUSTOM:
case S3:
// if hadoop version >= 2.7, it can call method 'FileSystem.truncate' to truncate file,
// this method was new in hadoop 2.7, otherwise use CarbonFile.truncate to do this.
Expand Down Expand Up @@ -392,7 +418,7 @@ public static boolean createNewLockFile(String filePath, FileType fileType) thro
}

public enum FileType {
LOCAL, HDFS, ALLUXIO, VIEWFS, S3
LOCAL, HDFS, ALLUXIO, VIEWFS, S3, CUSTOM
}

/**
Expand All @@ -413,6 +439,7 @@ public static String addSchemeIfNotExists(String filePath) {
case ALLUXIO:
case VIEWFS:
case S3:
case CUSTOM:
default:
return filePath;
}
Expand All @@ -432,6 +459,7 @@ public static String getUpdatedFilePath(String filePath, FileType fileType) {
case HDFS:
case VIEWFS:
case S3:
case CUSTOM:
return filePath;
case ALLUXIO:
return StringUtils.startsWith(filePath, "alluxio") ? filePath : "alluxio:///" + filePath;
Expand Down Expand Up @@ -483,6 +511,7 @@ public static long getDirectorySize(String filePath) throws IOException {
case ALLUXIO:
case VIEWFS:
case S3:
case CUSTOM:
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(getConfiguration());
return fs.getContentSummary(path).getLength();
Expand Down Expand Up @@ -524,6 +553,7 @@ public static void createDirectoryAndSetPermission(String directoryPath, FsPermi
case HDFS:
case ALLUXIO:
case VIEWFS:
case CUSTOM:
try {
Path path = new Path(directoryPath);
FileSystem fs = path.getFileSystem(getConfiguration());
Expand Down Expand Up @@ -551,6 +581,10 @@ public static void createDirectoryAndSetPermission(String directoryPath, FsPermi
* Check and append the hadoop's defaultFS to the path
*/
public static String checkAndAppendDefaultFs(String path, Configuration conf) {
if (FileFactory.getFileType(path) == FileType.CUSTOM) {
// If its custom file type, already schema is present, no need to append schema.
return path;
}
String defaultFs = conf.get(CarbonCommonConstants.FS_DEFAULT_FS);
String lowerPath = path.toLowerCase();
if (lowerPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX) || lowerPath
Expand All @@ -567,6 +601,28 @@ public static String checkAndAppendDefaultFs(String path, Configuration conf) {
}
}

/**
* Return true if schema is present or not in the file path
*
* @param path
* @return
*/
public static boolean checkIfPrefixExists(String path) {
if (FileFactory.getFileType(path) == FileType.CUSTOM) {
// If its custom file type, already schema is present, no need to append schema.
return true;
}

final String lowerPath = path.toLowerCase(Locale.getDefault());
return lowerPath.contains("://") || lowerPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)
|| lowerPath.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) || lowerPath
.startsWith(CarbonCommonConstants.LOCAL_FILE_PREFIX) || lowerPath
.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX) || lowerPath
.startsWith(CarbonCommonConstants.S3N_PREFIX) || lowerPath
.startsWith(CarbonCommonConstants.S3_PREFIX) || lowerPath
.startsWith(CarbonCommonConstants.S3A_PREFIX);
}

/**
* set the file replication
*
Expand Down

0 comments on commit 85f1b9f

Please sign in to comment.