Skip to content

Commit

Permalink
[CARBONDATA-3366] Support SDK reader to read blocklet level split
Browse files Browse the repository at this point in the history
To provide more flexibility in SDK reader, blocklet level read support for carbondata files from SDK reader is required.
With this, SDK reader can be used in distributed environment or in multi thread environment by creating carbon readers in each worker at split level (blocklet split).
For this in CarbonReaderBuilder new interface is added.

This closes #3196
  • Loading branch information
ajantha-bhat authored and xubo245 committed May 20, 2019
1 parent 24fe230 commit c2d4b3e
Show file tree
Hide file tree
Showing 7 changed files with 326 additions and 47 deletions.
Expand Up @@ -247,4 +247,8 @@ public Configuration getConfiguration() {
@Override public void setConfiguration(Configuration configuration) {
this.configuration = configuration;
}

@Override public String getFilePath() {
return carbonFilePath;
}
}
Expand Up @@ -54,4 +54,12 @@ SegmentRefreshInfo getCommittedSegmentRefreshInfo(Segment segment, UpdateVO upda
Configuration getConfiguration();

void setConfiguration(Configuration configuration);

/**
* get table path if ReadCommittedScope is TableStatusReadCommittedScope
* get file path if ReadCommittedScope is LatestFilesReadCommittedScope
*
* @return
*/
String getFilePath();
}
Expand Up @@ -112,4 +112,8 @@ public SegmentRefreshInfo getCommittedSegmentRefreshInfo(Segment segment, Update
@Override public void setConfiguration(Configuration configuration) {
this.configuration = configuration;
}

@Override public String getFilePath() {
return identifier.getTablePath();
}
}
Expand Up @@ -29,6 +29,7 @@
import org.apache.carbondata.hadoop.CarbonRecordReader;
import org.apache.carbondata.hadoop.util.CarbonVectorizedRecordReader;

import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;


Expand Down Expand Up @@ -147,6 +148,16 @@ public static CarbonReaderBuilder builder(String tablePath, String tableName) {
return new CarbonReaderBuilder(tablePath, tableName);
}

/**
* Return a new {@link CarbonReaderBuilder} instance
*
* @param inputSplit CarbonInputSplit Object
* @return CarbonReaderBuilder object
*/
public static CarbonReaderBuilder builder(InputSplit inputSplit) {
return new CarbonReaderBuilder(inputSplit);
}

/**
* Return a new {@link CarbonReaderBuilder} instance
*
Expand Down
Expand Up @@ -20,7 +20,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
Expand All @@ -35,10 +37,12 @@
import org.apache.carbondata.core.util.CarbonSessionInfo;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
import org.apache.carbondata.hadoop.util.CarbonVectorizedRecordReader;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
Expand All @@ -57,6 +61,7 @@ public class CarbonReaderBuilder {
private String tableName;
private Configuration hadoopConf;
private boolean useVectorReader = true;
private InputSplit inputSplit;
private boolean useArrowReader;

/**
Expand All @@ -71,6 +76,10 @@ public class CarbonReaderBuilder {
ThreadLocalSessionInfo.setCarbonSessionInfo(new CarbonSessionInfo());
}

CarbonReaderBuilder(InputSplit inputSplit) {
this.inputSplit = inputSplit;
ThreadLocalSessionInfo.setCarbonSessionInfo(new CarbonSessionInfo());
}

/**
* Configure the projection column names of carbon reader
Expand Down Expand Up @@ -147,7 +156,6 @@ public CarbonReaderBuilder withBatch(int batch) {
public CarbonReaderBuilder withHadoopConf(String key, String value) {
if (this.hadoopConf == null) {
this.hadoopConf = new Configuration();

}
this.hadoopConf.set(key, value);
return this;
Expand Down Expand Up @@ -175,36 +183,32 @@ public <T> ArrowCarbonReader<T> buildArrowReader() throws IOException, Interrupt
return (ArrowCarbonReader<T>) this.build();
}

/**
* Build CarbonReader
*
* @param <T>
* @return CarbonReader
* @throws IOException
* @throws InterruptedException
*/
public <T> CarbonReader<T> build()
throws IOException, InterruptedException {
if (hadoopConf == null) {
hadoopConf = FileFactory.getConfiguration();
private CarbonFileInputFormat prepareFileInputFormat(Job job, boolean enableBlockletDistribution,
boolean disableLoadBlockDataMap) throws IOException {
if (inputSplit != null && inputSplit instanceof CarbonInputSplit) {
tablePath =
((CarbonInputSplit) inputSplit).getSegment().getReadCommittedScope().getFilePath();
tableName = "UnknownTable" + UUID.randomUUID();
}
CarbonTable table = CarbonTable.buildTable(tablePath, tableName, hadoopConf);
if (enableBlockletDistribution) {
// set cache level to blocklet level
Map<String, String> tableProperties =
table.getTableInfo().getFactTable().getTableProperties();
tableProperties.put(CarbonCommonConstants.CACHE_LEVEL,"BLOCKLET");
table.getTableInfo().getFactTable().setTableProperties(tableProperties);
}
CarbonTable table;
// now always infer schema. TODO:Refactor in next version.
table = CarbonTable.buildTable(tablePath, tableName, hadoopConf);
final CarbonFileInputFormat format = new CarbonFileInputFormat();
final Job job = new Job(hadoopConf);
format.setTableInfo(job.getConfiguration(), table.getTableInfo());
format.setTablePath(job.getConfiguration(), table.getTablePath());
format.setTableName(job.getConfiguration(), table.getTableName());
format.setDatabaseName(job.getConfiguration(), table.getDatabaseName());
if (filterExpression != null) {
format.setFilterPredicates(job.getConfiguration(), filterExpression);
}

if (projectionColumns != null) {
// set the user projection
int len = projectionColumns.length;
// TODO : Handle projection of complex child columns
for (int i = 0; i < len; i++) {
if (projectionColumns[i].contains(".")) {
throw new UnsupportedOperationException(
Expand All @@ -213,40 +217,66 @@ public <T> CarbonReader<T> build()
}
format.setColumnProjection(job.getConfiguration(), projectionColumns);
}
if ((disableLoadBlockDataMap) && (filterExpression == null)) {
job.getConfiguration().set("filter_blocks", "false");
}
return format;
}

private <T> RecordReader getRecordReader(Job job, CarbonFileInputFormat format,
List<RecordReader<Void, T>> readers, InputSplit split)
throws IOException, InterruptedException {
TaskAttemptContextImpl attempt =
new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
RecordReader reader;
QueryModel queryModel = format.createQueryModel(split, attempt);
boolean hasComplex = false;
for (ProjectionDimension projectionDimension : queryModel.getProjectionDimensions()) {
if (projectionDimension.getDimension().isComplex()) {
hasComplex = true;
break;
}
}
if (useVectorReader && !hasComplex) {
queryModel.setDirectVectorFill(filterExpression == null);
reader = new CarbonVectorizedRecordReader(queryModel);
} else {
reader = format.createRecordReader(split, attempt);
}
try {
reader.initialize(split, attempt);
} catch (Exception e) {
CarbonUtil.closeStreams(readers.toArray(new RecordReader[0]));
throw e;
}
return reader;
}

if (filterExpression == null) {
job.getConfiguration().set("filter_blocks", "false");
}
/**
* Build CarbonReader
*
* @param <T>
* @return CarbonReader
* @throws IOException
* @throws InterruptedException
*/
public <T> CarbonReader<T> build()
throws IOException, InterruptedException {
if (inputSplit != null) {
return buildWithSplits(inputSplit);
}
if (hadoopConf == null) {
hadoopConf = FileFactory.getConfiguration();
}
final Job job = new Job(new JobConf(hadoopConf));
CarbonFileInputFormat format = prepareFileInputFormat(job, false, true);
try {
List<InputSplit> splits =
format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
List<RecordReader<Void, T>> readers = new ArrayList<>(splits.size());
for (InputSplit split : splits) {
TaskAttemptContextImpl attempt =
new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
RecordReader reader;
QueryModel queryModel = format.createQueryModel(split, attempt);
boolean hasComplex = false;
for (ProjectionDimension projectionDimension : queryModel.getProjectionDimensions()) {
if (projectionDimension.getDimension().isComplex()) {
hasComplex = true;
break;
}
}
if (useVectorReader && !hasComplex) {
queryModel.setDirectVectorFill(filterExpression == null);
reader = new CarbonVectorizedRecordReader(queryModel);
} else {
reader = format.createRecordReader(split, attempt);
}
try {
reader.initialize(split, attempt);
readers.add(reader);
} catch (Exception e) {
CarbonUtil.closeStreams(readers.toArray(new RecordReader[0]));
throw e;
}
RecordReader reader = getRecordReader(job, format, readers, split);
readers.add(reader);
}
if (useArrowReader) {
return new ArrowCarbonReader<>(readers);
Expand All @@ -256,9 +286,54 @@ public <T> CarbonReader<T> build()
} catch (Exception ex) {
// Clear the datamap cache as it can get added in getSplits() method
DataMapStoreManager.getInstance()
.clearDataMaps(table.getAbsoluteTableIdentifier());
.clearDataMaps(format.getAbsoluteTableIdentifier(hadoopConf));
throw ex;
}
}


private <T> CarbonReader<T> buildWithSplits(InputSplit inputSplit)
throws IOException, InterruptedException {
if (hadoopConf == null) {
hadoopConf = FileFactory.getConfiguration();
}
final Job job = new Job(new JobConf(hadoopConf));
CarbonFileInputFormat format = prepareFileInputFormat(job, false, true);
try {
List<RecordReader<Void, T>> readers = new ArrayList<>(1);
RecordReader reader = getRecordReader(job, format, readers, inputSplit);
readers.add(reader);
if (useArrowReader) {
return new ArrowCarbonReader<>(readers);
} else {
return new CarbonReader<>(readers);
}
} catch (Exception ex) {
// Clear the datamap cache as it can get added in getSplits() method
DataMapStoreManager.getInstance()
.clearDataMaps(format.getAbsoluteTableIdentifier(hadoopConf));
throw ex;
}
}

/**
* Gets an array of CarbonInputSplits.
* In carbondata, splits can be block level or blocklet level.
* by default splits are block level.
*
* @param enableBlockletDistribution, returns blocklet level splits if set to true,
* else block level splits.
* @return
* @throws IOException
*/
public InputSplit[] getSplits(boolean enableBlockletDistribution) throws IOException {
if (hadoopConf == null) {
hadoopConf = FileFactory.getConfiguration();
}
final Job job = new Job(new JobConf(hadoopConf));
CarbonFileInputFormat format = prepareFileInputFormat(job, enableBlockletDistribution, false);
List<InputSplit> splits =
format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
return splits.toArray(new InputSplit[splits.size()]);
}
}
Expand Up @@ -27,6 +27,7 @@
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.log4j.Logger;

import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
Expand Down Expand Up @@ -2553,4 +2554,36 @@ public void testArrowReader() {
}
}


@Test
public void testReadBlocklet() throws IOException, InterruptedException {
String path = "./testWriteFiles/" + System.nanoTime();
FileUtils.deleteDirectory(new File(path));

Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);

TestUtil.writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, 1, 100);

InputSplit[] splits = CarbonReader.builder(path).getSplits(true);
// check for 3 blocklet count (as only one carbon file will be created)
Assert.assertEquals(splits.length, 3);

int totalCount = 0;
for (int k = 0; k < splits.length; k++) {
CarbonReader reader = CarbonReader
.builder(splits[k])
.build();
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
i++;
}
totalCount += i;
reader.close();
}
Assert.assertEquals(totalCount, 1000000);
FileUtils.deleteDirectory(new File(path));
}
}

0 comments on commit c2d4b3e

Please sign in to comment.