Skip to content

Commit

Permalink
support carbon.load.directWriteHdfs.enabled for S3
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Sep 7, 2018
1 parent 50248f5 commit 4ebc9e8
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ public final class CarbonLoadOptionConstants {
public static final String SORT_COLUMN_BOUNDS_ROW_DELIMITER = ";";

@CarbonProperty
public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS
= "carbon.load.directWriteHdfs.enabled";
public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT = "false";
public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH
= "carbon.load.directWriteToStorePath.enabled";
public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT = "false";

/**
* If the sort memory is insufficient, spill inmemory pages to disk.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.expression.ColumnExpression;
import org.apache.carbondata.core.scan.expression.LiteralExpression;
import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.sdk.file.*;

/**
Expand All @@ -37,6 +39,12 @@ public static void main(String[] args) throws Exception {
System.exit(0);
}

String backupProperty = CarbonProperties.getInstance()
.getProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT);
CarbonProperties.getInstance()
.addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH, "true");

String path = "s3a://sdk/WriterOutput";
if (args.length > 3) {
path=args[3];
Expand Down Expand Up @@ -86,5 +94,9 @@ public static void main(String[] args) throws Exception {
}
System.out.println("\nFinished");
reader.close();

CarbonProperties.getInstance()
.addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
backupProperty);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,10 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {

test("test data loading with directly writing fact data to hdfs") {
val originStatus = CarbonProperties.getInstance().getProperty(
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT)
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT)
CarbonProperties.getInstance().addProperty(
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS, "true")
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH, "true")

val testData = s"$resourcesPath/sample.csv"
sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
Expand All @@ -256,7 +256,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
)

CarbonProperties.getInstance().addProperty(
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
originStatus)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
protected WritableByteChannel fileChannel;
protected long currentOffsetInFile;
/**
* The path of CarbonData file to write in hdfs
* The path of CarbonData file to write in hdfs/s3
*/
private String carbonDataFileHdfsPath;
private String carbonDataFileStorePath;
/**
* The temp path of carbonData file used on executor
*/
Expand Down Expand Up @@ -145,9 +145,9 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
*/
protected DataMapWriterListener listener;
/**
* Whether directly write fact data to hdfs
* Whether directly write fact data to store path
*/
private boolean enableDirectlyWriteData2Hdfs = false;
private boolean enableDirectlyWriteDataToStorePath = false;

protected ExecutorService fallbackExecutorService;

Expand All @@ -172,11 +172,11 @@ public AbstractFactDataWriter(CarbonFactDataHandlerModel model) {

// whether to directly write fact data to HDFS
String directlyWriteData2Hdfs = propInstance
.getProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT);
this.enableDirectlyWriteData2Hdfs = "TRUE".equalsIgnoreCase(directlyWriteData2Hdfs);
.getProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT);
this.enableDirectlyWriteDataToStorePath = "TRUE".equalsIgnoreCase(directlyWriteData2Hdfs);

if (enableDirectlyWriteData2Hdfs) {
if (enableDirectlyWriteDataToStorePath) {
LOGGER.info("Carbondata will directly write fact data to HDFS.");
} else {
LOGGER.info("Carbondata will write temporary fact data to local disk.");
Expand Down Expand Up @@ -225,7 +225,7 @@ protected void createNewFileIfReachThreshold(long blockletSizeToBeAdded)
if ((currentFileSize + blockletSizeToBeAdded) >= blockSizeThreshold && currentFileSize != 0) {
// set the current file size to zero
String activeFile =
enableDirectlyWriteData2Hdfs ? carbonDataFileHdfsPath : carbonDataFileTempPath;
enableDirectlyWriteDataToStorePath ? carbonDataFileStorePath : carbonDataFileTempPath;
LOGGER.info("Writing data to file as max file size reached for file: "
+ activeFile + ". Data block size: " + currentFileSize);
// write meta data to end of the existing file
Expand Down Expand Up @@ -269,7 +269,7 @@ private void notifyDataMapBlockEnd() {
protected void commitCurrentFile(boolean copyInCurrentThread) {
notifyDataMapBlockEnd();
CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
if (!enableDirectlyWriteData2Hdfs) {
if (!enableDirectlyWriteDataToStorePath) {
if (copyInCurrentThread) {
CarbonUtil.copyCarbonDataFileToCarbonStorePath(carbonDataFileTempPath,
model.getCarbonDataDirectoryPath(), fileSizeInBytes);
Expand All @@ -290,14 +290,14 @@ public void initializeWriter() throws CarbonDataWriterException {
.getCarbonDataFileName(fileCount, model.getCarbonDataFileAttributes().getTaskId(),
model.getBucketId(), model.getTaskExtension(),
"" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
this.carbonDataFileHdfsPath = model.getCarbonDataDirectoryPath() + File.separator
this.carbonDataFileStorePath = model.getCarbonDataDirectoryPath() + File.separator
+ carbonDataFileName;
try {
if (enableDirectlyWriteData2Hdfs) {
if (enableDirectlyWriteDataToStorePath) {
// the block size will be twice the block_size specified by user to make sure that
// one carbondata file only consists exactly one HDFS block.
fileOutputStream = FileFactory
.getDataOutputStream(carbonDataFileHdfsPath, FileFactory.FileType.HDFS,
.getDataOutputStream(carbonDataFileStorePath, FileFactory.FileType.HDFS,
CarbonCommonConstants.BYTEBUFFER_SIZE, fileSizeInBytes * 2);
} else {
//each time we initialize writer, we choose a local temp location randomly
Expand Down Expand Up @@ -374,11 +374,11 @@ protected void writeIndexFile() throws IOException, CarbonDataWriterException {
// get the block index info thrift
List<BlockIndex> blockIndexThrift = CarbonMetadataUtil.getBlockIndexInfo(blockIndexInfoList);
String indexFileName;
if (enableDirectlyWriteData2Hdfs) {
String rawFileName = model.getCarbonDataDirectoryPath() + File.separator + CarbonTablePath
.getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
model.getBucketId(), model.getTaskExtension(),
"" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
if (enableDirectlyWriteDataToStorePath) {
String rawFileName = model.getCarbonDataDirectoryPath() + CarbonCommonConstants.FILE_SEPARATOR
+ CarbonTablePath.getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
model.getBucketId(), model.getTaskExtension(),
"" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
indexFileName = FileFactory.getUpdatedFilePath(rawFileName, FileFactory.FileType.HDFS);
} else {
// randomly choose a temp location for index file
Expand All @@ -401,7 +401,7 @@ protected void writeIndexFile() throws IOException, CarbonDataWriterException {
writer.writeThrift(blockIndex);
}
writer.close();
if (!enableDirectlyWriteData2Hdfs) {
if (!enableDirectlyWriteDataToStorePath) {
CarbonUtil
.copyCarbonDataFileToCarbonStorePath(indexFileName, model.getCarbonDataDirectoryPath(),
fileSizeInBytes);
Expand Down

0 comments on commit 4ebc9e8

Please sign in to comment.