Skip to content

Commit

Permalink
added s3 carbon file
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal642 committed Jun 25, 2018
1 parent ca466d9 commit 0665c05
Show file tree
Hide file tree
Showing 34 changed files with 400 additions and 219 deletions.
Expand Up @@ -729,6 +729,13 @@ public final class CarbonCommonConstants {
@CarbonProperty
public static final String LOCK_TYPE = "carbon.lock.type";

/**
* Specifies the path where the lock files have to be created.
* By default, lock files are created in table path.
*/
@CarbonProperty
public static final String LOCK_PATH = "carbon.lock.path";

/**
* ZOOKEEPER_ENABLE_DEFAULT the default value for zookeeper will be true for carbon
*/
Expand Down Expand Up @@ -963,9 +970,9 @@ public final class CarbonCommonConstants {
public static final String CARBON_LOCK_TYPE_HDFS = "HDFSLOCK";

/**
* S3LOCK TYPE
* MEMORY LOCK TYPE
*/
public static final String CARBON_LOCK_TYPE_S3 = "S3LOCK";
public static final String CARBON_LOCK_TYPE_MEMORY = "MEMORYLOCK";

/**
* Invalid filter member log string
Expand Down
Expand Up @@ -407,7 +407,7 @@ public CarbonTable getCarbonTable(AbsoluteTableIdentifier identifier) {
try {
carbonTable = CarbonTable
.buildFromTablePath(identifier.getTableName(), identifier.getDatabaseName(),
identifier.getTablePath());
identifier.getTablePath(), identifier.getCarbonTableIdentifier().getTableId());
} catch (IOException e) {
LOGGER.error("failed to get carbon table from table Path");
// ignoring exception
Expand Down
Expand Up @@ -189,6 +189,6 @@ private static void writeLoadDetailsIntoFile(String location,
private static ICarbonLock getDataMapStatusLock() {
return CarbonLockFactory
.getCarbonLockObj(CarbonProperties.getInstance().getSystemFolderLocation(),
LockUsage.DATAMAP_STATUS_LOCK);
LockUsage.DATAMAP_STATUS_LOCK, "1");
}
}
Expand Up @@ -269,17 +269,7 @@ public boolean delete() {
// append to a file only if file already exists else file not found
// exception will be thrown by hdfs
if (CarbonUtil.isFileExists(path)) {
if (FileFactory.FileType.S3 == fileType) {
DataInputStream dataInputStream = fileSystem.open(pt);
int count = dataInputStream.available();
// create buffer
byte[] byteStreamBuffer = new byte[count];
int bytesRead = dataInputStream.read(byteStreamBuffer);
stream = fileSystem.create(pt, true, bufferSize);
stream.write(byteStreamBuffer, 0, bytesRead);
} else {
stream = fileSystem.append(pt, bufferSize);
}
stream = fileSystem.append(pt, bufferSize);
} else {
stream = fileSystem.create(pt, true, bufferSize);
}
Expand Down Expand Up @@ -461,7 +451,7 @@ public boolean createNewFile(String filePath, FileFactory.FileType fileType, boo
return fs.delete(path, true);
}

@Override public boolean mkdirs(String filePath, FileFactory.FileType fileType)
@Override public boolean mkdirs(String filePath)
throws IOException {
filePath = filePath.replace("\\", "/");
Path path = new Path(filePath);
Expand Down
Expand Up @@ -144,7 +144,7 @@ boolean createNewFile(String filePath, FileFactory.FileType fileType, boolean do

boolean deleteFile(String filePath, FileFactory.FileType fileType) throws IOException;

boolean mkdirs(String filePath, FileFactory.FileType fileType) throws IOException;
boolean mkdirs(String filePath) throws IOException;

DataOutputStream getDataOutputStreamUsingAppend(String path, FileFactory.FileType fileType)
throws IOException;
Expand Down
Expand Up @@ -120,10 +120,6 @@ public boolean renameForce(String changetoName) {
((DistributedFileSystem) fs).rename(fileStatus.getPath(), new Path(changetoName),
org.apache.hadoop.fs.Options.Rename.OVERWRITE);
return true;
} else if (fileStatus.getPath().toString().startsWith("s3n")
|| fileStatus.getPath().toString().startsWith("s3a")) {
fs.delete(new Path(changetoName), true);
return fs.rename(fileStatus.getPath(), new Path(changetoName));
} else {
return fs.rename(fileStatus.getPath(), new Path(changetoName));
}
Expand Down
Expand Up @@ -433,10 +433,10 @@ public boolean createNewFile(String filePath, FileFactory.FileType fileType, boo
return FileFactory.deleteAllFilesOfDir(file);
}

@Override public boolean mkdirs(String filePath, FileFactory.FileType fileType)
@Override public boolean mkdirs(String filePath)
throws IOException {
filePath = filePath.replace("\\", "/");
filePath = FileFactory.getUpdatedFilePath(filePath, fileType);
filePath = FileFactory.getUpdatedFilePath(filePath);
File file = new File(filePath);
return file.mkdirs();
}
Expand Down
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.datastore.filesystem;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.util.CarbonUtil;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;

public class S3CarbonFile extends HDFSCarbonFile {

private static final LogService LOGGER =
LogServiceFactory.getLogService(HDFSCarbonFile.class.getName());

public S3CarbonFile(String filePath) {
super(filePath);
}

public S3CarbonFile(String filePath, Configuration hadoopConf) {
super(filePath, hadoopConf);
}

public S3CarbonFile(Path path) {
super(path);
}

public S3CarbonFile(Path path, Configuration hadoopConf) {
super(path, hadoopConf);
}

public S3CarbonFile(FileStatus fileStatus) {
super(fileStatus);
}

@Override
public boolean renameForce(String changetoName) {
FileSystem fs;
try {
fs = fileStatus.getPath().getFileSystem(hadoopConf);
return fs.rename(fileStatus.getPath(), new Path(changetoName));
} catch (IOException e) {
LOGGER.error("Exception occured: " + e.getMessage());
return false;
}
}

@Override
public DataOutputStream getDataOutputStreamUsingAppend(String path, FileFactory.FileType fileType)
throws IOException {
return getDataOutputStream(path, fileType, CarbonCommonConstants.BYTEBUFFER_SIZE, true);
}

@Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
int bufferSize, boolean append) throws IOException {
Path pt = new Path(path);
FileSystem fileSystem = pt.getFileSystem(FileFactory.getConfiguration());
FSDataOutputStream stream;
if (append) {
// append to a file only if file already exists else file not found
// exception will be thrown by hdfs
if (CarbonUtil.isFileExists(path)) {
DataInputStream dataInputStream = fileSystem.open(pt);
int count = dataInputStream.available();
// create buffer
byte[] byteStreamBuffer = new byte[count];
int bytesRead = dataInputStream.read(byteStreamBuffer);
stream = fileSystem.create(pt, true, bufferSize);
stream.write(byteStreamBuffer, 0, bytesRead);
} else {
stream = fileSystem.create(pt, true, bufferSize);
}
} else {
stream = fileSystem.create(pt, true, bufferSize);
}
return stream;
}

@Override
public CarbonFile getParentFile() {
Path parent = fileStatus.getPath().getParent();
return null == parent ? null : new S3CarbonFile(parent, hadoopConf);
}

@Override
protected CarbonFile[] getFiles(FileStatus[] listStatus) {
if (listStatus == null) {
return new CarbonFile[0];
}
CarbonFile[] files = new CarbonFile[listStatus.length];
for (int i = 0; i < files.length; i++) {
files[i] = new S3CarbonFile(listStatus[i]);
}
return files;
}

@Override
protected List<CarbonFile> getFiles(RemoteIterator<LocatedFileStatus> listStatus)
throws IOException {
List<CarbonFile> carbonFiles = new ArrayList<>();
while (listStatus.hasNext()) {
Path filePath = listStatus.next().getPath();
carbonFiles.add(new S3CarbonFile(filePath));
}
return carbonFiles;
}

}
Expand Up @@ -22,6 +22,7 @@
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.HDFSCarbonFile;
import org.apache.carbondata.core.datastore.filesystem.LocalCarbonFile;
import org.apache.carbondata.core.datastore.filesystem.S3CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.ViewFSCarbonFile;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -47,8 +48,9 @@ public CarbonFile getCarbonFile(String path, FileFactory.FileType fileType) {
case LOCAL:
return new LocalCarbonFile(FileFactory.getUpdatedFilePath(path, fileType));
case HDFS:
case S3:
return new HDFSCarbonFile(path);
case S3:
return new S3CarbonFile(path);
case ALLUXIO:
return new AlluxioCarbonFile(path);
case VIEWFS:
Expand All @@ -63,8 +65,9 @@ public CarbonFile getCarbonFile(String path, FileFactory.FileType fileType, Conf
case LOCAL:
return new LocalCarbonFile(FileFactory.getUpdatedFilePath(path, fileType));
case HDFS:
case S3:
return new HDFSCarbonFile(path, conf);
case S3:
return new S3CarbonFile(path, conf);
case ALLUXIO:
return new AlluxioCarbonFile(path);
case VIEWFS:
Expand Down
Expand Up @@ -256,7 +256,7 @@ public static boolean deleteAllCarbonFilesOfDir(CarbonFile path) {
}

public static boolean mkdirs(String filePath, FileType fileType) throws IOException {
return getCarbonFile(filePath).mkdirs(filePath, fileType);
return getCarbonFile(filePath).mkdirs(filePath);
}

/**
Expand All @@ -269,15 +269,7 @@ public static boolean mkdirs(String filePath, FileType fileType) throws IOExcept
*/
public static DataOutputStream getDataOutputStreamUsingAppend(String path, FileType fileType)
throws IOException {
if (FileType.S3 == fileType) {
CarbonFile carbonFile = getCarbonFile(path);
if (carbonFile.exists()) {
carbonFile.delete();
}
return carbonFile.getDataOutputStream(path,fileType);
} else {
return getCarbonFile(path).getDataOutputStreamUsingAppend(path, fileType);
}
return getCarbonFile(path).getDataOutputStreamUsingAppend(path, fileType);
}

/**
Expand Down
Expand Up @@ -39,8 +39,11 @@ public class CarbonLockFactory {
*/
private static String lockTypeConfigured;

private static String lockPath;

static {
CarbonLockFactory.getLockTypeConfigured();
getLockpath();
}

/**
Expand All @@ -52,44 +55,57 @@ public class CarbonLockFactory {
*/
public static ICarbonLock getCarbonLockObj(AbsoluteTableIdentifier absoluteTableIdentifier,
String lockFile) {

String tablePath = absoluteTableIdentifier.getTablePath();
String tablePath;
if (lockPath.isEmpty()) {
tablePath = absoluteTableIdentifier.getTablePath();
} else {
if (absoluteTableIdentifier
.getCarbonTableIdentifier().getTableId().isEmpty()) {
throw new RuntimeException("Table id is empty");
}
tablePath = lockPath + CarbonCommonConstants.FILE_SEPARATOR + absoluteTableIdentifier
.getCarbonTableIdentifier().getTableId();
}
if (lockTypeConfigured.equals(CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER)) {
return new ZooKeeperLocking(absoluteTableIdentifier, lockFile);
} else if (tablePath.startsWith(CarbonCommonConstants.S3A_PREFIX) ||
tablePath.startsWith(CarbonCommonConstants.S3N_PREFIX) ||
tablePath.startsWith(CarbonCommonConstants.S3_PREFIX)) {
lockTypeConfigured = CarbonCommonConstants.CARBON_LOCK_TYPE_S3;
return new S3FileLock(absoluteTableIdentifier, lockFile);
return new ZooKeeperLocking(tablePath, lockFile);
} else if (tablePath.startsWith(CarbonCommonConstants.S3A_PREFIX) || tablePath
.startsWith(CarbonCommonConstants.S3N_PREFIX) || tablePath
.startsWith(CarbonCommonConstants.S3_PREFIX) || lockTypeConfigured
.equals(CarbonCommonConstants.CARBON_LOCK_TYPE_MEMORY)) {
lockTypeConfigured = CarbonCommonConstants.CARBON_LOCK_TYPE_MEMORY;
return new MemoryLock(absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName(),
lockFile);
} else if (tablePath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
lockTypeConfigured = CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS;
return new HdfsFileLock(absoluteTableIdentifier, lockFile);
return new HdfsFileLock(tablePath, lockFile);
} else {
lockTypeConfigured = CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL;
return new LocalFileLock(absoluteTableIdentifier, lockFile);
return new LocalFileLock(tablePath, lockFile);
}
}

/**
*
* @param locFileLocation
* @param lockFile
* @return carbon lock
*/
public static ICarbonLock getCarbonLockObj(String locFileLocation, String lockFile) {
public static ICarbonLock getCarbonLockObj(String locFileLocation, String lockFile,
String tableId) {
String lockFileLocation;
if (lockPath.isEmpty()) {
lockFileLocation = locFileLocation;
} else {
lockFileLocation = lockPath + CarbonCommonConstants.FILE_SEPARATOR + tableId;
}
switch (lockTypeConfigured) {
case CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL:
return new LocalFileLock(locFileLocation, lockFile);

return new LocalFileLock(lockFileLocation, lockFile);
case CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER:
return new ZooKeeperLocking(locFileLocation, lockFile);

return new ZooKeeperLocking(lockFileLocation, lockFile);
case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS:
return new HdfsFileLock(locFileLocation, lockFile);

case CarbonCommonConstants.CARBON_LOCK_TYPE_S3:
return new S3FileLock(locFileLocation, lockFile);

return new HdfsFileLock(lockFileLocation, lockFile);
case CarbonCommonConstants.CARBON_LOCK_TYPE_MEMORY:
return new MemoryLock(tableId, lockFile);
default:
throw new UnsupportedOperationException("Not supported the lock type");
}
Expand All @@ -105,4 +121,10 @@ private static void getLockTypeConfigured() {
LOGGER.info("Configured lock type is: " + lockTypeConfigured);
}

private static void getLockpath() {
lockPath = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.LOCK_PATH, "")
.toUpperCase();
}

}

0 comments on commit 0665c05

Please sign in to comment.