Skip to content

Commit

Permalink
getSplits should clear the cache
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed May 29, 2019
1 parent 1023ba9 commit 5e0cffa
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 4 deletions.
Expand Up @@ -250,7 +250,7 @@ public static CarbonTable buildTable(
String tablePath,
String tableName,
Configuration configuration) throws IOException {
TableInfo tableInfoInfer = CarbonUtil.buildDummyTableInfo(tablePath, "null", "null");
TableInfo tableInfoInfer = CarbonUtil.buildDummyTableInfo(tablePath, tableName, "null");
// InferSchema from data file
org.apache.carbondata.format.TableInfo tableInfo =
CarbonUtil.inferSchema(tablePath, tableName, false, configuration);
Expand Down
Expand Up @@ -384,9 +384,6 @@ private <T> CarbonReader<T> buildWithSplits(InputSplit inputSplit)
return new CarbonReader<>(readers);
}
} catch (Exception ex) {
// Clear the datamap cache as it can get added in getSplits() method
DataMapStoreManager.getInstance().clearDataMaps(
format.getOrCreateCarbonTable((job.getConfiguration())).getAbsoluteTableIdentifier());
throw ex;
}
}
Expand All @@ -409,6 +406,9 @@ public InputSplit[] getSplits(boolean enableBlockletDistribution) throws IOExcep
CarbonFileInputFormat format = prepareFileInputFormat(job, enableBlockletDistribution, false);
List<InputSplit> splits =
format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
// Clear the datamap cache as it is added in getSplits() method
DataMapStoreManager.getInstance().clearDataMaps(
format.getOrCreateCarbonTable((job.getConfiguration())).getAbsoluteTableIdentifier());
for (InputSplit split : splits) {
// Load the detailInfo
((CarbonInputSplit) split).getDetailInfo();
Expand Down
Expand Up @@ -2586,4 +2586,26 @@ public void testReadBlocklet() throws IOException, InterruptedException {
Assert.assertEquals(totalCount, 1000000);
FileUtils.deleteDirectory(new File(path));
}

@Test
public void testGetSplits() throws IOException, InterruptedException {
String path = "./testWriteFiles/" + System.nanoTime();
FileUtils.deleteDirectory(new File(path));

Field[] fields = new Field[2];
fields[0] = new Field("name", DataTypes.STRING);
fields[1] = new Field("age", DataTypes.INT);

TestUtil.writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, 1, 100);

InputSplit[] splits = CarbonReader.builder(path).getSplits(true);
// check for 3 blocklet count (as only one carbon file will be created)
Assert.assertEquals(splits.length, 3);

InputSplit[] splits1 = CarbonReader.builder(path).getSplits(false);
// check for 1 block count (as only one carbon file will be created)
Assert.assertEquals(splits1.length, 1);
FileUtils.deleteDirectory(new File(path));
}

}

0 comments on commit 5e0cffa

Please sign in to comment.