Skip to content

Commit

Permalink
Introduce ReadOptimizedView & RealtimeView out of TableFileSystemView
Browse files Browse the repository at this point in the history
  - Usage now marks code as clearly using either RO or RT views, for future evolution
  - Tests on all of FileGroups and FileSlices
  • Loading branch information
Vinoth Chandar authored and prazanna committed Jun 23, 2017
1 parent c00f1a9 commit 754ab88
Show file tree
Hide file tree
Showing 16 changed files with 259 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public Dataset<Row> read(String... paths) {
+ hoodieTable.getMetaClient().getBasePath());
}

TableFileSystemView fileSystemView = new HoodieTableFileSystemView(hoodieTable.getMetaClient(),
TableFileSystemView.ReadOptimizedView fileSystemView = new HoodieTableFileSystemView(hoodieTable.getMetaClient(),
hoodieTable.getCompletedCommitTimeline(), fs.globStatus(new Path(path)));
List<HoodieDataFile> latestFiles = fileSystemView.getLatestDataFiles().collect(
Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ public boolean savepoint(String commitTime, String user, String comment) {
.mapToPair((PairFunction<String, String, List<String>>) partitionPath -> {
// Scan all partitions files with this commit time
logger.info("Collecting latest files in partition path " + partitionPath);
TableFileSystemView view = table.getFileSystemView();
TableFileSystemView.ReadOptimizedView view = table.getROFileSystemView();
List<String> latestFiles =
view.getLatestDataFilesBeforeOrOn(partitionPath, commitTime)
.map(HoodieDataFile::getFileName).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ JavaPairRDD<String, String> loadInvolvedFiles(List<String> partitions,
List<Tuple2<String, String>> list = new ArrayList<>();
if (latestCommitTime.isPresent()) {
List<HoodieDataFile> filteredFiles =
hoodieTable.getFileSystemView().getLatestDataFilesBeforeOrOn(partitionPath,
hoodieTable.getROFileSystemView().getLatestDataFilesBeforeOrOn(partitionPath,
latestCommitTime.get().getTimestamp()).collect(Collectors.toList());
for (HoodieDataFile file : filteredFiles) {
list.add(new Tuple2<>(partitionPath, file.getFileName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
protected final FileSystem fs;
protected final HoodieTable<T> hoodieTable;
protected HoodieTimeline hoodieTimeline;
protected TableFileSystemView fileSystemView;
protected TableFileSystemView.ReadOptimizedView fileSystemView;
protected final Schema schema;

public HoodieIOHandle(HoodieWriteConfig config, String commitTime,
Expand All @@ -50,7 +50,7 @@ public HoodieIOHandle(HoodieWriteConfig config, String commitTime,
this.fs = FSUtils.getFs();
this.hoodieTable = hoodieTable;
this.hoodieTimeline = hoodieTable.getCompletedCommitTimeline();
this.fileSystemView = hoodieTable.getFileSystemView();
this.fileSystemView = hoodieTable.getROFileSystemView();
this.schema =
HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public HoodieCompactionMetadata compact(JavaSparkContext jsc, HoodieWriteConfig
List<CompactionOperation> operations =
jsc.parallelize(partitionPaths, partitionPaths.size())
.flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> hoodieTable
.getFileSystemView()
.getRTFileSystemView()
.getLatestFileSlices(partitionPath)
.map(s -> new CompactionOperation(s.getDataFile().get(),
partitionPath, s.getLogFiles().collect(Collectors.toList()), config))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ private List<SmallFile> getSmallFiles(String partitionPath) {

if (!commitTimeline.empty()) { // if we have some commits
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
List<HoodieDataFile> allFiles = getFileSystemView()
List<HoodieDataFile> allFiles = getROFileSystemView()
.getLatestDataFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp())
.collect(Collectors.toList());

Expand Down
18 changes: 18 additions & 0 deletions hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,24 @@ public TableFileSystemView getFileSystemView() {
return new HoodieTableFileSystemView(metaClient, getCompletedCommitTimeline());
}

/**
* Get the read optimized view of the file system for this table
*
* @return
*/
public TableFileSystemView.ReadOptimizedView getROFileSystemView() {
return new HoodieTableFileSystemView(metaClient, getCompletedCommitTimeline());
}

/**
* Get the real time view of the file system for this table
*
* @return
*/
public TableFileSystemView.RealtimeView getRTFileSystemView() {
return new HoodieTableFileSystemView(metaClient, getCompletedCommitTimeline());
}

/**
* Get the view of the file system for this table
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.table.HoodieTable;

import java.util.Collection;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.IOUtils;
Expand Down Expand Up @@ -412,7 +413,7 @@ public void testCreateSavepoint() throws Exception {
List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), getConfig().shouldAssumeDatePartitioning());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig());
final TableFileSystemView view = table.getFileSystemView();
final TableFileSystemView.ReadOptimizedView view = table.getROFileSystemView();
List<HoodieDataFile> dataFiles = partitionPaths.stream().flatMap(s -> {
return view.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("002"));
}).collect(Collectors.toList());
Expand All @@ -431,7 +432,7 @@ public void testCreateSavepoint() throws Exception {

metaClient = new HoodieTableMetaClient(fs, basePath);
table = HoodieTable.getHoodieTable(metaClient, getConfig());
final TableFileSystemView view1 = table.getFileSystemView();
final TableFileSystemView.ReadOptimizedView view1 = table.getROFileSystemView();
dataFiles = partitionPaths.stream().flatMap(s -> {
return view1.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("002"));
}).collect(Collectors.toList());
Expand Down Expand Up @@ -482,7 +483,7 @@ public void testRollbackToSavepoint() throws Exception {
List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), getConfig().shouldAssumeDatePartitioning());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig());
final TableFileSystemView view1 = table.getFileSystemView();
final TableFileSystemView.ReadOptimizedView view1 = table.getROFileSystemView();

List<HoodieDataFile> dataFiles = partitionPaths.stream().flatMap(s -> {
return view1.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("003"));
Expand All @@ -501,7 +502,7 @@ public void testRollbackToSavepoint() throws Exception {

metaClient = new HoodieTableMetaClient(fs, basePath);
table = HoodieTable.getHoodieTable(metaClient, getConfig());
final TableFileSystemView view2 = table.getFileSystemView();
final TableFileSystemView.ReadOptimizedView view2 = table.getROFileSystemView();

dataFiles = partitionPaths.stream().flatMap(s -> {
return view2.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("004"));
Expand All @@ -524,7 +525,7 @@ public void testRollbackToSavepoint() throws Exception {

metaClient = new HoodieTableMetaClient(fs, basePath);
table = HoodieTable.getHoodieTable(metaClient, getConfig());
final TableFileSystemView view3 = table.getFileSystemView();
final TableFileSystemView.ReadOptimizedView view3 = table.getROFileSystemView();
dataFiles = partitionPaths.stream().flatMap(s -> {
return view3.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("002"));
}).collect(Collectors.toList());
Expand Down Expand Up @@ -961,7 +962,7 @@ public void testSmallInsertHandlingForUpserts() throws Exception {
assertEquals("2 files needs to be committed.", 2, statuses.size());
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath);
HoodieTable table = HoodieTable.getHoodieTable(metadata, config);
TableFileSystemView fileSystemView = table.getFileSystemView();
TableFileSystemView.ReadOptimizedView fileSystemView = table.getROFileSystemView();
List<HoodieDataFile> files = fileSystemView.getLatestDataFilesBeforeOrOn(TEST_PARTITION_PATH, commitTime3).collect(
Collectors.toList());
int numTotalInsertsInCommit3 = 0;
Expand Down Expand Up @@ -1057,7 +1058,7 @@ public void testSmallInsertHandlingForInserts() throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config);
List<HoodieDataFile> files =
table.getFileSystemView().getLatestDataFilesBeforeOrOn(TEST_PARTITION_PATH, commitTime3)
table.getROFileSystemView().getLatestDataFilesBeforeOrOn(TEST_PARTITION_PATH, commitTime3)
.collect(Collectors.toList());
assertEquals("Total of 2 valid data files", 2, files.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,13 @@ public void testSimpleInsertAndUpdate() throws Exception {
assertFalse(commit.isPresent());

FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
TableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metaClient,
hoodieTable.getCompletedCompactionCommitTimeline(), allFiles);
Stream<HoodieDataFile> dataFilesToRead = fsView.getLatestDataFiles();
Stream<HoodieDataFile> dataFilesToRead = roView.getLatestDataFiles();
assertTrue(!dataFilesToRead.findAny().isPresent());

fsView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
dataFilesToRead = fsView.getLatestDataFiles();
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
dataFilesToRead = roView.getLatestDataFiles();
assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
dataFilesToRead.findAny().isPresent());

Expand Down Expand Up @@ -169,8 +169,8 @@ public void testSimpleInsertAndUpdate() throws Exception {
compactor.compact(jsc, getConfig(), table);

allFiles = HoodieTestUtils.listAllDataFilesInPath(fs, cfg.getBasePath());
fsView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
dataFilesToRead = fsView.getLatestDataFiles();
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
dataFilesToRead = roView.getLatestDataFiles();
assertTrue(dataFilesToRead.findAny().isPresent());

// verify that there is a commit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void testLogFileCountsAfterCompaction() throws Exception {
table = HoodieTable.getHoodieTable(metaClient, config);
for (String partitionPath : dataGen.getPartitionPaths()) {
List<FileSlice> groupedLogFiles =
table.getFileSystemView().getLatestFileSlices(partitionPath)
table.getRTFileSystemView().getLatestFileSlices(partitionPath)
.collect(Collectors.toList());
for (FileSlice fileSlice : groupedLogFiles) {
assertEquals("There should be 1 log file written for every data file", 1,
Expand All @@ -192,7 +192,7 @@ public void testLogFileCountsAfterCompaction() throws Exception {
HoodieTimeline.GREATER));

for (String partitionPath : dataGen.getPartitionPaths()) {
List<FileSlice> groupedLogFiles = table.getFileSystemView()
List<FileSlice> groupedLogFiles = table.getRTFileSystemView()
.getLatestFileSlices(partitionPath)
.collect(Collectors.toList());
for (FileSlice slice: groupedLogFiles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public String getFileId() {
}

public Optional<HoodieDataFile> getDataFile() {
return Optional.of(dataFile);
return Optional.ofNullable(dataFile);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,91 +30,69 @@

/**
* Interface for viewing the table file system.
* Dependening on the Hoodie Table Type - The view of the filesystem changes.
* <p>
* ReadOptimizedView - Lets queries run only on organized columnar data files at the expense of latency
* WriteOptimizedView - Lets queries run on columnar data as well as delta files (sequential) at the expense of query execution time
*
* @since 0.3.0
*/
public interface TableFileSystemView {

/**
* Stream all the latest data files in the given partition
*
* @param partitionPath
* @return
* ReadOptimizedView - methods to provide a view of columnar data files only.
*/
Stream<HoodieDataFile> getLatestDataFiles(String partitionPath);
interface ReadOptimizedView {
/**
* Stream all the latest data files in the given partition
*/
Stream<HoodieDataFile> getLatestDataFiles(String partitionPath);

/**
* Stream all the latest data files, in the file system view
*
* @return
*/
Stream<HoodieDataFile> getLatestDataFiles();
/**
* Stream all the latest data files, in the file system view
*/
Stream<HoodieDataFile> getLatestDataFiles();

/**
* Stream all the latest version data files in the given partition
* with precondition that commitTime(file) before maxCommitTime
*
* @param partitionPath
* @param maxCommitTime
* @return
*/
Stream<HoodieDataFile> getLatestDataFilesBeforeOrOn(String partitionPath,
String maxCommitTime);
/**
* Stream all the latest version data files in the given partition with precondition that
* commitTime(file) before maxCommitTime
*/
Stream<HoodieDataFile> getLatestDataFilesBeforeOrOn(String partitionPath,
String maxCommitTime);

/**
* Stream all the latest data files pass
*
* @param commitsToReturn
* @return
*/
Stream<HoodieDataFile> getLatestDataFilesInRange(List<String> commitsToReturn);
/**
* Stream all the latest data files pass
*/
Stream<HoodieDataFile> getLatestDataFilesInRange(List<String> commitsToReturn);

/**
* Stream all the data file versions grouped by FileId for a given partition
*
* @param partitionPath
* @return
*/
Stream<HoodieDataFile> getAllDataFiles(String partitionPath);
/**
* Stream all the data file versions grouped by FileId for a given partition
*/
Stream<HoodieDataFile> getAllDataFiles(String partitionPath);
}

/**
* Stream all the latest file slices in the given partition
*
* @param partitionPath
* @return
* RealtimeView - methods to access a combination of columnar data files + log files with real time data.
*/
Stream<FileSlice> getLatestFileSlices(String partitionPath);
interface RealtimeView {
/**
* Stream all the latest file slices in the given partition
*/
Stream<FileSlice> getLatestFileSlices(String partitionPath);

/**
* Stream all the latest file slices in the given partition
* with precondition that commitTime(file) before maxCommitTime
*
* @param partitionPath
* @param maxCommitTime
* @return
*/
Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath,
String maxCommitTime);
/**
* Stream all the latest file slices in the given partition with precondition that
* commitTime(file) before maxCommitTime
*/
Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath,
String maxCommitTime);

/**
* Stream all the latest file slices, in the given range
*
* @param commitsToReturn
* @return
*/
Stream<FileSlice> getLatestFileSliceInRange(List<String> commitsToReturn);
/**
* Stream all the latest file slices, in the given range
*/
Stream<FileSlice> getLatestFileSliceInRange(List<String> commitsToReturn);

/**
* Stream all the file slices for a given partition, latest or not.
*
* @param partitionPath
* @return
*/
Stream<FileSlice> getAllFileSlices(String partitionPath);
/**
* Stream all the file slices for a given partition, latest or not.
*/
Stream<FileSlice> getAllFileSlices(String partitionPath);
}

/**
* Stream all the file groups for a given partition
Expand All @@ -123,12 +101,4 @@ Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath,
* @return
*/
Stream<HoodieFileGroup> getAllFileGroups(String partitionPath);

/**
* Get the file Status for the path specified
*
* @param path
* @return
*/
FileStatus getFileStatus(String path);
}

0 comments on commit 754ab88

Please sign in to comment.