Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HOTFIX] support "carbon.load.directWriteHdfs.enabled" for S3 #2697

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ public final class CarbonLoadOptionConstants {
public static final String SORT_COLUMN_BOUNDS_ROW_DELIMITER = ";";

@CarbonProperty
public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS
= "carbon.load.directWriteHdfs.enabled";
public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT = "false";
public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH
= "carbon.load.directWriteToStorePath.enabled";
public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT = "false";

/**
* If the sort memory is insufficient, spill inmemory pages to disk.
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration-parameters.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ This section provides the details of all the configurations required for the Car
| carbon.prefetch.buffersize | 1000 | When the configuration ***carbon.merge.sort.prefetch*** is configured to true, we need to set the number of records that can be prefetched.This configuration is used specify the number of records to be prefetched.**NOTE: **Configuring more number of records to be prefetched increases memory footprint as more records will have to be kept in memory. |
| load_min_size_inmb | 256 | This configuration is used along with ***carbon.load.min.size.enabled***.This determines the minimum size of input files to be considered for distribution among executors while data loading.**NOTE:** Refer to ***carbon.load.min.size.enabled*** for understanding when this configuration needs to be used and its advantages and disadvantages. |
| carbon.load.sortmemory.spill.percentage | 0 | During data loading, some data pages are kept in memory upto memory configured in ***carbon.sort.storage.inmemory.size.inmb*** beyond which they are spilled to disk as intermediate temporary sort files.This configuration determines after what percentage data needs to be spilled to disk.**NOTE:** Without this configuration, when the data pages occupy upto configured memory, new data pages would be dumped to disk and old pages are still maintained in disk. |
| carbon.load.directWriteHdfs.enabled | false | During data load all the carbondata files are written to local disk and finally copied to the target location in HDFS.Enabling this parameter will make carrbondata files to be written directly onto target HDFS location bypassing the local disk.**NOTE:** Writing directly to HDFS saves local disk IO(once for writing the files and again for copying to HDFS) there by improving the performance.But the drawback is when data loading fails or the application crashes, unwanted carbondata files will remain in the target HDFS location until it is cleared during next data load or by running *CLEAN FILES* DDL command |
| carbon.load.directWriteToStorePath.enabled | false | During data load, all the carbondata files are written to local disk and finally copied to the target store location in HDFS/S3.Enabling this parameter will make carbondata files to be written directly onto target HDFS/S3 location bypassing the local disk.**NOTE:** Writing directly to HDFS/S3 saves local disk IO(once for writing the files and again for copying to HDFS/S3) there by improving the performance.But the drawback is when data loading fails or the application crashes, unwanted carbondata files will remain in the target HDFS/S3 location until it is cleared during next data load or by running *CLEAN FILES* DDL command |
| carbon.options.serialization.null.format | \N | Based on the business scenarios, some columns might need to be loaded with null values.As null value cannot be written in csv files, some special characters might be adopted to specify null values.This configuration can be used to specify the null values format in the data being loaded. |
| carbon.sort.storage.inmemory.size.inmb | 512 | CarbonData writes every ***carbon.sort.size*** number of records to intermediate temp files during data loading to ensure memory footprint is within limits.When ***enable.unsafe.sort*** configuration is enabled, instead of using ***carbon.sort.size*** which is based on rows count, size occupied in memory is used to determine when to flush data pages to intermediate temp files.This configuration determines the memory to be used for storing data pages in memory.**NOTE:** Configuring a higher values ensures more data is maintained in memory and hence increases data loading performance due to reduced or no IO.Based on the memory availability in the nodes of the cluster, configure the values accordingly. |
| carbon.column.compressor | snappy | CarbonData will compress the column values using the compressor specified by this configuration. Currently CarbonData supports 'snappy' and 'zstd' compressors. | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.expression.ColumnExpression;
import org.apache.carbondata.core.scan.expression.LiteralExpression;
import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.sdk.file.*;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -40,6 +42,12 @@ public static void main(String[] args) throws Exception {
System.exit(0);
}

String backupProperty = CarbonProperties.getInstance()
.getProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT);
CarbonProperties.getInstance()
.addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH, "true");

String path = "s3a://sdk/WriterOutput";
if (args.length > 3) {
path=args[3];
Expand Down Expand Up @@ -87,5 +95,9 @@ public static void main(String[] args) throws Exception {
}
System.out.println("\nFinished");
reader.close();

CarbonProperties.getInstance()
.addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
backupProperty);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,10 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {

test("test data loading with directly writing fact data to hdfs") {
val originStatus = CarbonProperties.getInstance().getProperty(
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT)
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT)
CarbonProperties.getInstance().addProperty(
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS, "true")
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH, "true")

val testData = s"$resourcesPath/sample.csv"
sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
Expand All @@ -256,7 +256,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
)

CarbonProperties.getInstance().addProperty(
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
originStatus)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
protected WritableByteChannel fileChannel;
protected long currentOffsetInFile;
/**
* The path of CarbonData file to write in hdfs
* The path of CarbonData file to write in hdfs/s3
*/
private String carbonDataFileHdfsPath;
private String carbonDataFileStorePath;
/**
* The temp path of carbonData file used on executor
*/
Expand Down Expand Up @@ -145,9 +145,9 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
*/
protected DataMapWriterListener listener;
/**
* Whether directly write fact data to hdfs
* Whether directly write fact data to store path
*/
private boolean enableDirectlyWriteData2Hdfs = false;
private boolean enableDirectlyWriteDataToStorePath = false;

protected ExecutorService fallbackExecutorService;

Expand All @@ -172,11 +172,11 @@ public AbstractFactDataWriter(CarbonFactDataHandlerModel model) {

// whether to directly write fact data to HDFS
String directlyWriteData2Hdfs = propInstance
.getProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT);
this.enableDirectlyWriteData2Hdfs = "TRUE".equalsIgnoreCase(directlyWriteData2Hdfs);
.getProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT);
this.enableDirectlyWriteDataToStorePath = "TRUE".equalsIgnoreCase(directlyWriteData2Hdfs);

if (enableDirectlyWriteData2Hdfs) {
if (enableDirectlyWriteDataToStorePath) {
LOGGER.info("Carbondata will directly write fact data to HDFS.");
} else {
LOGGER.info("Carbondata will write temporary fact data to local disk.");
Expand Down Expand Up @@ -225,7 +225,7 @@ protected void createNewFileIfReachThreshold(long blockletSizeToBeAdded)
if ((currentFileSize + blockletSizeToBeAdded) >= blockSizeThreshold && currentFileSize != 0) {
// set the current file size to zero
String activeFile =
enableDirectlyWriteData2Hdfs ? carbonDataFileHdfsPath : carbonDataFileTempPath;
enableDirectlyWriteDataToStorePath ? carbonDataFileStorePath : carbonDataFileTempPath;
LOGGER.info("Writing data to file as max file size reached for file: "
+ activeFile + ". Data block size: " + currentFileSize);
// write meta data to end of the existing file
Expand Down Expand Up @@ -269,7 +269,7 @@ private void notifyDataMapBlockEnd() {
protected void commitCurrentFile(boolean copyInCurrentThread) {
notifyDataMapBlockEnd();
CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
if (!enableDirectlyWriteData2Hdfs) {
if (!enableDirectlyWriteDataToStorePath) {
try {
if (copyInCurrentThread) {
CarbonUtil.copyCarbonDataFileToCarbonStorePath(carbonDataFileTempPath,
Expand All @@ -296,14 +296,14 @@ public void initializeWriter() throws CarbonDataWriterException {
.getCarbonDataFileName(fileCount, model.getCarbonDataFileAttributes().getTaskId(),
model.getBucketId(), model.getTaskExtension(),
"" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
this.carbonDataFileHdfsPath = model.getCarbonDataDirectoryPath() + File.separator
this.carbonDataFileStorePath = model.getCarbonDataDirectoryPath() + File.separator
+ carbonDataFileName;
try {
if (enableDirectlyWriteData2Hdfs) {
if (enableDirectlyWriteDataToStorePath) {
// the block size will be twice the block_size specified by user to make sure that
// one carbondata file only consists exactly one HDFS block.
fileOutputStream = FileFactory
.getDataOutputStream(carbonDataFileHdfsPath, FileFactory.FileType.HDFS,
.getDataOutputStream(carbonDataFileStorePath, FileFactory.FileType.HDFS,
CarbonCommonConstants.BYTEBUFFER_SIZE, fileSizeInBytes * 2);
} else {
//each time we initialize writer, we choose a local temp location randomly
Expand Down Expand Up @@ -380,11 +380,11 @@ protected void writeIndexFile() throws IOException, CarbonDataWriterException {
// get the block index info thrift
List<BlockIndex> blockIndexThrift = CarbonMetadataUtil.getBlockIndexInfo(blockIndexInfoList);
String indexFileName;
if (enableDirectlyWriteData2Hdfs) {
String rawFileName = model.getCarbonDataDirectoryPath() + File.separator + CarbonTablePath
.getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
model.getBucketId(), model.getTaskExtension(),
"" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
if (enableDirectlyWriteDataToStorePath) {
String rawFileName = model.getCarbonDataDirectoryPath() + CarbonCommonConstants.FILE_SEPARATOR
+ CarbonTablePath.getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
model.getBucketId(), model.getTaskExtension(),
"" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
indexFileName = FileFactory.getUpdatedFilePath(rawFileName, FileFactory.FileType.HDFS);
} else {
// randomly choose a temp location for index file
Expand All @@ -407,7 +407,7 @@ protected void writeIndexFile() throws IOException, CarbonDataWriterException {
writer.writeThrift(blockIndex);
}
writer.close();
if (!enableDirectlyWriteData2Hdfs) {
if (!enableDirectlyWriteDataToStorePath) {
CarbonUtil
.copyCarbonDataFileToCarbonStorePath(indexFileName, model.getCarbonDataDirectoryPath(),
fileSizeInBytes);
Expand Down