Skip to content

Commit

Permalink
[CARBONDATA-3363] SDK supports read carbon data by given file lists, …
Browse files Browse the repository at this point in the history
…file or folde

SDK supports read carbon data by given file lists, file or folde

This closes #3194
  • Loading branch information
xubo245 authored and zzcclp committed May 22, 2019
1 parent df71291 commit 7541ef2
Show file tree
Hide file tree
Showing 7 changed files with 404 additions and 53 deletions.
Expand Up @@ -17,16 +17,20 @@

package org.apache.carbondata.examples.sdk;

import java.util.List;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.expression.ColumnExpression;
import org.apache.carbondata.core.scan.expression.LiteralExpression;
import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.sdk.file.CarbonReader;

import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;

import static org.apache.carbondata.sdk.file.utils.SDKUtil.listFiles;
import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
Expand All @@ -35,60 +39,69 @@
* Example for testing carbonReader on S3
*/
public class SDKS3ReadExample {
public static void main(String[] args) throws Exception {
Logger logger = LogServiceFactory.getLogService(SDKS3ReadExample.class.getName());
if (args == null || args.length < 3) {
logger.error("Usage: java CarbonS3Example: <access-key> <secret-key>"
+ "<s3-endpoint> [table-path-on-s3]");
System.exit(0);
}
public static void main(String[] args) throws Exception {
Logger logger = LogServiceFactory.getLogService(SDKS3ReadExample.class.getName());
if (args == null || args.length < 3) {
logger.error("Usage: java CarbonS3Example: <access-key> <secret-key>"
+ "<s3-endpoint> [table-path-on-s3]");
System.exit(0);
}

String path = "s3a://sdk/WriterOutput/carbondata5";
if (args.length > 3) {
path = args[3];
}

// 1. read with file list
Configuration conf = new Configuration();

String path = "s3a://sdk/WriterOutput/carbondata5";
if (args.length > 3) {
path=args[3];
}
conf.set(ACCESS_KEY, args[0]);
conf.set(SECRET_KEY, args[1]);
conf.set(ENDPOINT, args[2]);
List fileLists = listFiles(path, CarbonTablePath.CARBON_DATA_EXT, conf);

// Read data
EqualToExpression equalToExpression = new EqualToExpression(
new ColumnExpression("name", DataTypes.STRING),
new LiteralExpression("robot1", DataTypes.STRING));
// Read data
EqualToExpression equalToExpression = new EqualToExpression(
new ColumnExpression("name", DataTypes.STRING),
new LiteralExpression("robot1", DataTypes.STRING));

CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age"})
.filter(equalToExpression)
.withHadoopConf(ACCESS_KEY, args[0])
.withHadoopConf(SECRET_KEY, args[1])
.withHadoopConf(ENDPOINT, args[2])
.build();
CarbonReader reader = CarbonReader
.builder()
.projection(new String[]{"name", "age"})
.filter(equalToExpression)
.withHadoopConf(ACCESS_KEY, args[0])
.withHadoopConf(SECRET_KEY, args[1])
.withHadoopConf(ENDPOINT, args[2])
.withFileLists(fileLists)
.build();

System.out.println("\nData:");
int i = 0;
while (i < 20 && reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
System.out.println(row[0] + " " + row[1]);
i++;
}
System.out.println("\nFinished");
reader.close();
System.out.println("\nData:");
int i = 0;
while (i < 20 && reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
System.out.println(row[0] + " " + row[1]);
i++;
}
System.out.println("\nFinished");
reader.close();

// Read without filter
CarbonReader reader2 = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age"})
.withHadoopConf(ACCESS_KEY, args[0])
.withHadoopConf(SECRET_KEY, args[1])
.withHadoopConf(ENDPOINT, args[2])
.build();
// Read without filter
CarbonReader reader2 = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age"})
.withHadoopConf(ACCESS_KEY, args[0])
.withHadoopConf(SECRET_KEY, args[1])
.withHadoopConf(ENDPOINT, args[2])
.build();

System.out.println("\nData:");
i = 0;
while (i < 20 && reader2.hasNext()) {
Object[] row = (Object[]) reader2.readNextRow();
System.out.println(row[0] + " " + row[1]);
i++;
}
System.out.println("\nFinished");
reader2.close();
System.out.println("\nData:");
i = 0;
while (i < 20 && reader2.hasNext()) {
Object[] row = (Object[]) reader2.readNextRow();
System.out.println(row[0] + " " + row[1]);
i++;
}
System.out.println("\nFinished");
reader2.close();
}
}
Expand Up @@ -150,7 +150,17 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {
LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList();
for (LoadMetadataDetails load : loadMetadataDetails) {
seg = new Segment(load.getLoadName(), null, readCommittedScope);
externalTableSegments.add(seg);
if (fileLists != null) {
for (int i = 0; i < fileLists.size(); i++) {
if (fileLists.get(i).toString().endsWith(seg.getSegmentNo()
+ CarbonTablePath.CARBON_DATA_EXT)) {
externalTableSegments.add(seg);
break;
}
}
} else {
externalTableSegments.add(seg);
}
}
}
List<InputSplit> splits = new ArrayList<>();
Expand All @@ -162,7 +172,14 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {
// do block filtering and get split
splits = getSplits(job, filter, externalTableSegments, null, partitionInfo, null);
} else {
for (CarbonFile carbonFile : getAllCarbonDataFiles(carbonTable.getTablePath())) {
List<CarbonFile> carbonFiles = null;
if (null != this.fileLists) {
carbonFiles = getAllCarbonDataFiles(this.fileLists);
} else {
carbonFiles = getAllCarbonDataFiles(carbonTable.getTablePath());
}

for (CarbonFile carbonFile : carbonFiles) {
// Segment id is set to null because SDK does not write carbondata files with respect
// to segments. So no specific name is present for this load.
CarbonInputSplit split =
Expand Down Expand Up @@ -208,6 +225,18 @@ private List<CarbonFile> getAllCarbonDataFiles(String tablePath) {
return carbonFiles;
}

private List<CarbonFile> getAllCarbonDataFiles(List fileLists) {
List<CarbonFile> carbonFiles = new LinkedList<CarbonFile>();
try {
for (int i = 0; i < fileLists.size(); i++) {
carbonFiles.add(FileFactory.getCarbonFile(fileLists.get(i).toString()));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return carbonFiles;
}

/**
* {@inheritDoc}
* Configurations FileInputFormat.INPUT_DIR, CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS
Expand Down
Expand Up @@ -132,6 +132,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
protected int numStreamFiles = 0;
protected int hitedStreamFiles = 0;
protected int numBlocks = 0;
protected List fileLists = null;

private CarbonTable carbonTable;

Expand All @@ -156,6 +157,10 @@ public int getNumBlocks() {
return numBlocks;
}

public void setFileLists(List fileLists) {
this.fileLists = fileLists;
}

/**
* Set the `tableInfo` in `configuration`
*/
Expand Down
Expand Up @@ -170,6 +170,17 @@ public static CarbonReaderBuilder builder(String tablePath) {
return builder(tablePath, tableName);
}

/**
* Return a new {@link CarbonReaderBuilder} instance
*
* @return CarbonReaderBuilder object
*/
public static CarbonReaderBuilder builder() {
UUID uuid = UUID.randomUUID();
String tableName = "UnknownTable" + uuid;
return new CarbonReaderBuilder(tableName);
}

/**
* Breaks the list of CarbonRecordReader in CarbonReader into multiple
* CarbonReader objects, each iterating through some 'carbondata' files
Expand Down
Expand Up @@ -41,6 +41,7 @@
import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
import org.apache.carbondata.hadoop.util.CarbonVectorizedRecordReader;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputSplit;
Expand All @@ -63,6 +64,7 @@ public class CarbonReaderBuilder {
private boolean useVectorReader = true;
private InputSplit inputSplit;
private boolean useArrowReader;
private List fileLists;

/**
* Construct a CarbonReaderBuilder with table path and table name
Expand All @@ -81,6 +83,54 @@ public class CarbonReaderBuilder {
ThreadLocalSessionInfo.setCarbonSessionInfo(new CarbonSessionInfo());
}

/**
* Construct a CarbonReaderBuilder with table name
*
* @param tableName table name
*/
CarbonReaderBuilder(String tableName) {
this.tableName = tableName;
ThreadLocalSessionInfo.setCarbonSessionInfo(new CarbonSessionInfo());
}

/**
* set carbonData file folder
*
* @param tablePath table path
* @return CarbonReaderBuilder object
*/
public CarbonReaderBuilder withFolder(String tablePath) {
this.tablePath = tablePath;
return this;
}

/**
* set carbondata file lists
*
* @param fileLists carbondata file lists
* @return CarbonReaderBuilder object
*/
public CarbonReaderBuilder withFileLists(List fileLists) {
if (null == this.fileLists) {
this.fileLists = fileLists;
} else {
this.fileLists.addAll(fileLists);
}
return this;
}

/**
* set one carbondata file
*
* @param file carbondata file
* @return CarbonReaderBuilder object
*/
public CarbonReaderBuilder withFile(String file) {
List fileLists = new ArrayList();
fileLists.add(file);
return withFileLists(fileLists);
}

/**
* Configure the projection column names of carbon reader
*
Expand Down Expand Up @@ -190,7 +240,27 @@ private CarbonFileInputFormat prepareFileInputFormat(Job job, boolean enableBloc
((CarbonInputSplit) inputSplit).getSegment().getReadCommittedScope().getFilePath();
tableName = "UnknownTable" + UUID.randomUUID();
}
CarbonTable table = CarbonTable.buildTable(tablePath, tableName, hadoopConf);
CarbonTable table;
// now always infer schema. TODO:Refactor in next version.
if (null == this.fileLists && null == tablePath) {
throw new IllegalArgumentException("Please set table path first.");
}
if (null != this.fileLists) {
if (fileLists.size() < 1) {
throw new IllegalArgumentException("fileLists must have one file in list as least!");
}
String commonString = String.valueOf(fileLists.get(0));
for (int i = 1; i < fileLists.size(); i++) {
commonString = commonString.substring(0, StringUtils.indexOfDifference(commonString,
String.valueOf(fileLists.get(i))));
}
int index = commonString.lastIndexOf("/");
commonString = commonString.substring(0, index);

table = CarbonTable.buildTable(commonString, tableName, hadoopConf);
} else {
table = CarbonTable.buildTable(tablePath, tableName, hadoopConf);
}
if (enableBlockletDistribution) {
// set cache level to blocklet level
Map<String, String> tableProperties =
Expand All @@ -206,6 +276,9 @@ private CarbonFileInputFormat prepareFileInputFormat(Job job, boolean enableBloc
if (filterExpression != null) {
format.setFilterPredicates(job.getConfiguration(), filterExpression);
}
if (null != this.fileLists) {
format.setFileLists(this.fileLists);
}
if (projectionColumns != null) {
// set the user projection
int len = projectionColumns.length;
Expand Down
Expand Up @@ -34,7 +34,7 @@ public static ArrayList listFiles(String sourceImageFolder,
final String suf, Configuration conf) throws Exception {
final String sufImageFinal = suf;
ArrayList result = new ArrayList();
CarbonFile[] fileList = FileFactory.getCarbonFile(sourceImageFolder).listFiles();
CarbonFile[] fileList = FileFactory.getCarbonFile(sourceImageFolder, conf).listFiles();
for (int i = 0; i < fileList.length; i++) {
if (fileList[i].isDirectory()) {
result.addAll(listFiles(fileList[i].getCanonicalPath(), sufImageFinal, conf));
Expand Down

0 comments on commit 7541ef2

Please sign in to comment.