Skip to content

Commit

Permalink
[CARBONDATA-3405] Fix getSplits() should clear the cache in SDK
Browse files Browse the repository at this point in the history
Problem: when getsplits is called back to back once with blocklet
and once with block cache, block cache is not set.
Cause: cache key was dbname_tableName, but table_name was always hardcoded to null.
Solution: set the table name in cache key, clear cache after
getting splits in the getsplits()

This closes #3247
  • Loading branch information
ajantha-bhat authored and kunal642 committed May 31, 2019
1 parent 32f5b50 commit 2251528
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public static CarbonTable buildTable(
String tablePath,
String tableName,
Configuration configuration) throws IOException {
TableInfo tableInfoInfer = CarbonUtil.buildDummyTableInfo(tablePath, "null", "null");
TableInfo tableInfoInfer = CarbonUtil.buildDummyTableInfo(tablePath, tableName, "null");
// InferSchema from data file
org.apache.carbondata.format.TableInfo tableInfo =
CarbonUtil.inferSchema(tablePath, tableName, false, configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,9 +384,6 @@ private <T> CarbonReader<T> buildWithSplits(InputSplit inputSplit)
return new CarbonReader<>(readers);
}
} catch (Exception ex) {
// Clear the datamap cache as it can get added in getSplits() method
DataMapStoreManager.getInstance().clearDataMaps(
format.getOrCreateCarbonTable((job.getConfiguration())).getAbsoluteTableIdentifier());
throw ex;
}
}
Expand All @@ -405,13 +402,23 @@ public InputSplit[] getSplits(boolean enableBlockletDistribution) throws IOExcep
if (hadoopConf == null) {
hadoopConf = FileFactory.getConfiguration();
}
final Job job = new Job(new JobConf(hadoopConf));
CarbonFileInputFormat format = prepareFileInputFormat(job, enableBlockletDistribution, false);
List<InputSplit> splits =
format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
for (InputSplit split : splits) {
// Load the detailInfo
((CarbonInputSplit) split).getDetailInfo();
Job job = null;
List<InputSplit> splits;
CarbonFileInputFormat format = null;
try {
job = new Job(new JobConf(hadoopConf));
format = prepareFileInputFormat(job, enableBlockletDistribution, false);
splits = format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
for (InputSplit split : splits) {
// Load the detailInfo
((CarbonInputSplit) split).getDetailInfo();
}
} finally {
if (format != null) {
// Clear the datamap cache as it is added in getSplits() method
DataMapStoreManager.getInstance().clearDataMaps(
format.getOrCreateCarbonTable((job.getConfiguration())).getAbsoluteTableIdentifier());
}
}
return splits.toArray(new InputSplit[splits.size()]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2586,4 +2586,26 @@ public void testReadBlocklet() throws IOException, InterruptedException {
Assert.assertEquals(totalCount, 1000000);
FileUtils.deleteDirectory(new File(path));
}

@Test
public void testGetSplits() throws IOException, InterruptedException {
String path = "./testWriteFiles/" + System.nanoTime();
FileUtils.deleteDirectory(new File(path));

Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);

TestUtil.writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, 1, 100);

InputSplit[] splits = CarbonReader.builder(path).getSplits(true);
// check for 3 blocklet count (as only one carbon file will be created)
Assert.assertEquals(splits.length, 3);

InputSplit[] splits1 = CarbonReader.builder(path).getSplits(false);
// check for 1 block count (as only one carbon file will be created)
Assert.assertEquals(splits1.length, 1);
FileUtils.deleteDirectory(new File(path));
}

}

0 comments on commit 2251528

Please sign in to comment.