Skip to content

Commit

Permalink
support files in different folder when using withFileLists and getSplit
Browse files Browse the repository at this point in the history
  • Loading branch information
xubo245 committed May 21, 2019
1 parent 02f7b11 commit e50993f
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,7 @@ public static void updateTableInfo(TableInfo tableInfo) {
public static CarbonTable buildTable(
String tablePath,
String tableName,
Configuration configuration,
boolean isFile) throws IOException {
if (isFile) {
int index = tablePath.lastIndexOf("/");
tablePath = tablePath.substring(0, index);
}
Configuration configuration) throws IOException {
TableInfo tableInfoInfer = CarbonUtil.buildDummyTableInfo(tablePath, "null", "null");
// InferSchema from data file
org.apache.carbondata.format.TableInfo tableInfo =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
import org.apache.carbondata.hadoop.util.CarbonVectorizedRecordReader;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputSplit;
Expand Down Expand Up @@ -248,9 +249,17 @@ private CarbonFileInputFormat prepareFileInputFormat(Job job, boolean enableBloc
if (fileLists.size() < 1) {
throw new IllegalArgumentException("fileLists must have one file in list as least!");
}
table = CarbonTable.buildTable(this.fileLists.get(0).toString(), tableName, hadoopConf, true);
String commonString = String.valueOf(fileLists.get(0));
for (int i = 1; i < fileLists.size(); i++) {
commonString = commonString.substring(0, StringUtils.indexOfDifference(commonString,
String.valueOf(fileLists.get(i))));
}
int index = commonString.lastIndexOf("/");
commonString = commonString.substring(0, index);

table = CarbonTable.buildTable(commonString, tableName, hadoopConf);
} else {
table = CarbonTable.buildTable(tablePath, tableName, hadoopConf, false);
table = CarbonTable.buildTable(tablePath, tableName, hadoopConf);
}
if (enableBlockletDistribution) {
// set cache level to blocklet level
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -959,4 +959,80 @@ public void testBinaryWithProjectionAndFileListsAndWithFile() throws Exception {
}
}

public void testGetSplitWithFileListsFromDifferentFolder() throws Exception {

String path1 = "./target/flowersFolder1";
String path2 = "./target/flowersFolder2";
writeCarbonFile(path1, 3);
writeCarbonFile(path2, 2);
List fileLists = listFiles(path1, CarbonTablePath.CARBON_DATA_EXT);
fileLists.addAll(listFiles(path2, CarbonTablePath.CARBON_DATA_EXT));

InputSplit[] splits = ArrowCarbonReader
.builder()
.withFileLists(fileLists)
.getSplits(true);
Assert.assertTrue(5 == splits.length);
for (int j = 0; j < splits.length; j++) {
ArrowCarbonReader.builder(splits[j]).build();
}
}

public void writeCarbonFile(String path, int num) throws Exception {
try {
FileUtils.deleteDirectory(new File(path));
} catch (IOException e) {
e.printStackTrace();
}
Field[] fields = new Field[5];
fields[0] = new Field("imageId", DataTypes.INT);
fields[1] = new Field("imageName", DataTypes.STRING);
fields[2] = new Field("imageBinary", DataTypes.BINARY);
fields[3] = new Field("txtName", DataTypes.STRING);
fields[4] = new Field("txtContent", DataTypes.STRING);

String imageFolder = "./src/test/resources/image/flowers";

byte[] originBinary = null;

// read and write image data
for (int j = 0; j < num; j++) {
CarbonWriter writer = CarbonWriter
.builder()
.outputPath(path)
.withCsvInput(new Schema(fields))
.writtenBy("SDKS3Example")
.withPageSizeInMb(1)
.build();
ArrayList files = listFiles(imageFolder, ".jpg");

if (null != files) {
for (int i = 0; i < files.size(); i++) {
// read image and encode to Hex
BufferedInputStream bis = new BufferedInputStream(new FileInputStream(files.get(i).toString()));
char[] hexValue = null;
originBinary = new byte[bis.available()];
while ((bis.read(originBinary)) != -1) {
hexValue = Hex.encodeHex(originBinary);
}

String txtFileName = files.get(i).toString().split(".jpg")[0] + ".txt";
BufferedInputStream txtBis = new BufferedInputStream(new FileInputStream(txtFileName));
String txtValue = null;
byte[] txtBinary = null;
txtBinary = new byte[txtBis.available()];
while ((txtBis.read(txtBinary)) != -1) {
txtValue = new String(txtBinary, "UTF-8");
}
// write data
System.out.println(files.get(i).toString());
writer.write(new String[]{String.valueOf(i), files.get(i).toString(), String.valueOf(hexValue),
txtFileName, txtValue});
bis.close();
}
}
writer.close();
}
}

}

0 comments on commit e50993f

Please sign in to comment.