Skip to content

Commit

Permalink
Merge branch 'apache:master' into HUDI-3505
Browse files Browse the repository at this point in the history
  • Loading branch information
hechao-ustc committed Jul 1, 2022
2 parents a105b0a + 62a0c96 commit 96817f1
Show file tree
Hide file tree
Showing 12 changed files with 278 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ public String unscheduleCompaction(
@CliCommand(value = "compaction unscheduleFileId", help = "UnSchedule Compaction for a fileId")
public String unscheduleCompactFile(
@CliOption(key = "fileId", mandatory = true, help = "File Id") final String fileId,
@CliOption(key = "partitionPath", mandatory = true, help = "partition path") final String partitionPath,
@CliOption(key = "partitionPath", unspecifiedDefaultValue = "", help = "partition path") final String partitionPath,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help = "Spark Master") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory,
@CliOption(key = {"skipValidation"}, help = "skip validation", unspecifiedDefaultValue = "false") boolean skipV,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public String showAllFileSlices(

@CliCommand(value = "show fsview latest", help = "Show latest file-system view")
public String showLatestFileSlices(
@CliOption(key = {"partitionPath"}, help = "A valid partition path", mandatory = true) String partition,
@CliOption(key = {"partitionPath"}, help = "A valid partition path", unspecifiedDefaultValue = "") String partition,
@CliOption(key = {"baseFileOnly"}, help = "Only display base file view",
unspecifiedDefaultValue = "false") boolean baseFileOnly,
@CliOption(key = {"maxInstant"}, help = "File-Slices upto this instant are displayed",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public String convert(
@CliOption(key = "tableName", mandatory = true, help = "Table name") final String tableName,
@CliOption(key = "tableType", mandatory = true, help = "Table type") final String tableType,
@CliOption(key = "rowKeyField", mandatory = true, help = "Row key field name") final String rowKeyField,
@CliOption(key = "partitionPathField", mandatory = true,
@CliOption(key = "partitionPathField", unspecifiedDefaultValue = "",
help = "Partition path field name") final String partitionPathField,
@CliOption(key = {"parallelism"}, mandatory = true,
help = "Parallelism for hoodie insert") final String parallelism,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
Expand Down Expand Up @@ -225,7 +226,7 @@ public String listPartitions(

@CliCommand(value = "metadata list-files", help = "Print a list of all files in a partition from the metadata")
public String listFiles(
@CliOption(key = {"partition"}, help = "Name of the partition to list files", mandatory = true) final String partition) throws IOException {
@CliOption(key = {"partition"}, help = "Name of the partition to list files", unspecifiedDefaultValue = "") final String partition) throws IOException {
HoodieCLI.getTableMetaClient();
HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(
Expand All @@ -235,8 +236,13 @@ public String listFiles(
return "[ERROR] Metadata Table not enabled/initialized\n\n";
}

Path partitionPath = new Path(HoodieCLI.basePath);
if (!StringUtils.isNullOrEmpty(partition)) {
partitionPath = new Path(HoodieCLI.basePath, partition);
}

HoodieTimer timer = new HoodieTimer().startTimer();
FileStatus[] statuses = metaReader.getAllFilesInPartition(new Path(HoodieCLI.basePath, partition));
FileStatus[] statuses = metaReader.getAllFilesInPartition(partitionPath);
LOG.debug("Took " + timer.endTimer() + " ms");

final List<Comparable[]> rows = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,73 @@
@Tag("functional")
public class TestFileSystemViewCommand extends CLIFunctionalTestHarness {

private String nonpartitionedTablePath;
private String partitionedTablePath;
private String partitionPath;
private SyncableFileSystemView fsView;
private SyncableFileSystemView nonpartitionedFsView;
private SyncableFileSystemView partitionedFsView;

@BeforeEach
public void init() throws IOException {
createNonpartitionedTable();
createPartitionedTable();
}

private void createNonpartitionedTable() throws IOException {
HoodieCLI.conf = hadoopConf();

// Create table and connect
String tableName = tableName();
String tablePath = tablePath(tableName);
String nonpartitionedTableName = "nonpartitioned_" + tableName();
nonpartitionedTablePath = tablePath(nonpartitionedTableName);
new TableCommand().createTable(
tablePath, tableName,
"COPY_ON_WRITE", "", 1, "org.apache.hudi.common.model.HoodieAvroPayload");
nonpartitionedTablePath, nonpartitionedTableName,
"COPY_ON_WRITE", "", 1, "org.apache.hudi.common.model.HoodieAvroPayload");

HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();

Files.createDirectories(Paths.get(nonpartitionedTablePath));

// Generate 2 commits
String commitTime1 = "3";
String commitTime2 = "4";

String fileId1 = UUID.randomUUID().toString();

// Write date files and log file
String testWriteToken = "2-0-2";
Files.createFile(Paths.get(nonpartitionedTablePath, FSUtils
.makeBaseFileName(commitTime1, testWriteToken, fileId1)));
Files.createFile(Paths.get(nonpartitionedTablePath, FSUtils
.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime1, 0, testWriteToken)));
Files.createFile(Paths.get(nonpartitionedTablePath, FSUtils
.makeBaseFileName(commitTime2, testWriteToken, fileId1)));
Files.createFile(Paths.get(nonpartitionedTablePath, FSUtils
.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime2, 0, testWriteToken)));

// Write commit files
Files.createFile(Paths.get(nonpartitionedTablePath, ".hoodie", commitTime1 + ".commit"));
Files.createFile(Paths.get(nonpartitionedTablePath, ".hoodie", commitTime2 + ".commit"));

// Reload meta client and create fsView
metaClient = HoodieTableMetaClient.reload(metaClient);

nonpartitionedFsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline(), true);
}

private void createPartitionedTable() throws IOException {
HoodieCLI.conf = hadoopConf();

// Create table and connect
String partitionedTableName = "partitioned_" + tableName();
partitionedTablePath = tablePath(partitionedTableName);
new TableCommand().createTable(
partitionedTablePath, partitionedTableName,
"COPY_ON_WRITE", "", 1, "org.apache.hudi.common.model.HoodieAvroPayload");

HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();

partitionPath = HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH;
String fullPartitionPath = Paths.get(tablePath, partitionPath).toString();
String fullPartitionPath = Paths.get(partitionedTablePath, partitionPath).toString();
Files.createDirectories(Paths.get(fullPartitionPath));

// Generate 2 commits
Expand All @@ -97,13 +146,13 @@ public void init() throws IOException {
.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime2, 0, testWriteToken)));

// Write commit files
Files.createFile(Paths.get(tablePath, ".hoodie", commitTime1 + ".commit"));
Files.createFile(Paths.get(tablePath, ".hoodie", commitTime2 + ".commit"));
Files.createFile(Paths.get(partitionedTablePath, ".hoodie", commitTime1 + ".commit"));
Files.createFile(Paths.get(partitionedTablePath, ".hoodie", commitTime2 + ".commit"));

// Reload meta client and create fsView
metaClient = HoodieTableMetaClient.reload(metaClient);

fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline(), true);
partitionedFsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline(), true);
}

/**
Expand All @@ -116,7 +165,7 @@ public void testShowCommits() {
assertTrue(cr.isSuccess());

// Get all file groups
Stream<HoodieFileGroup> fileGroups = fsView.getAllFileGroups(partitionPath);
Stream<HoodieFileGroup> fileGroups = partitionedFsView.getAllFileGroups(partitionPath);

List<Comparable[]> rows = new ArrayList<>();
fileGroups.forEach(fg -> fg.getAllFileSlices().forEach(fs -> {
Expand Down Expand Up @@ -164,7 +213,7 @@ public void testShowCommitsWithSpecifiedValues() {
assertTrue(cr.isSuccess());

List<Comparable[]> rows = new ArrayList<>();
Stream<HoodieFileGroup> fileGroups = fsView.getAllFileGroups(partitionPath);
Stream<HoodieFileGroup> fileGroups = partitionedFsView.getAllFileGroups(partitionPath);

// Only get instant 1, since maxInstant was specified 2
fileGroups.forEach(fg -> fg.getAllFileSlices().filter(fs -> fs.getBaseInstantTime().equals("1")).forEach(fs -> {
Expand Down Expand Up @@ -197,17 +246,7 @@ public void testShowCommitsWithSpecifiedValues() {
assertEquals(expected, got);
}

/**
* Test case for command 'show fsview latest'.
*/
@Test
public void testShowLatestFileSlices() {
// Test show with partition path '2016/03/15'
CommandResult cr = shell().executeCommand("show fsview latest --partitionPath " + partitionPath);
assertTrue(cr.isSuccess());

Stream<FileSlice> fileSlice = fsView.getLatestFileSlices(partitionPath);

private List<Comparable[]> fileSlicesToCRList(Stream<FileSlice> fileSlice, String partitionPath) {
List<Comparable[]> rows = new ArrayList<>();
fileSlice.forEach(fs -> {
int idx = 0;
Expand Down Expand Up @@ -245,7 +284,14 @@ public void testShowLatestFileSlices() {
.collect(Collectors.toList()).toString();
rows.add(row);
});
return rows;
}

/**(
* Test case for command 'show fsview latest'.
*/
@Test
public void testShowLatestFileSlices() throws IOException {
Function<Object, String> converterFunction =
entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString())));
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
Expand All @@ -267,9 +313,32 @@ public void testShowLatestFileSlices() {
.addTableHeaderField(HoodieTableHeaderFields.HEADER_DELTA_BASE_UNSCHEDULED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_DELTA_FILES_SCHEDULED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_DELTA_FILES_UNSCHEDULED);
String expected = HoodiePrintHelper.print(header, fieldNameToConverterMap, "", false, -1, false, rows);
expected = removeNonWordAndStripSpace(expected);
String got = removeNonWordAndStripSpace(cr.getResult().toString());
assertEquals(expected, got);

// Test show with partition path '2016/03/15'
new TableCommand().connect(partitionedTablePath, null, false, 0, 0, 0);
CommandResult partitionedTableCR = shell().executeCommand("show fsview latest --partitionPath " + partitionPath);
assertTrue(partitionedTableCR.isSuccess());

Stream<FileSlice> partitionedFileSlice = partitionedFsView.getLatestFileSlices(partitionPath);

List<Comparable[]> partitionedRows = fileSlicesToCRList(partitionedFileSlice, partitionPath);
String partitionedExpected = HoodiePrintHelper.print(header, fieldNameToConverterMap, "", false, -1, false, partitionedRows);
partitionedExpected = removeNonWordAndStripSpace(partitionedExpected);
String partitionedResults = removeNonWordAndStripSpace(partitionedTableCR.getResult().toString());
assertEquals(partitionedExpected, partitionedResults);

// Test show for non-partitioned table
new TableCommand().connect(nonpartitionedTablePath, null, false, 0, 0, 0);
CommandResult nonpartitionedTableCR = shell().executeCommand("show fsview latest");
assertTrue(nonpartitionedTableCR.isSuccess());

Stream<FileSlice> nonpartitionedFileSlice = nonpartitionedFsView.getLatestFileSlices("");

List<Comparable[]> nonpartitionedRows = fileSlicesToCRList(nonpartitionedFileSlice, "");

String nonpartitionedExpected = HoodiePrintHelper.print(header, fieldNameToConverterMap, "", false, -1, false, nonpartitionedRows);
nonpartitionedExpected = removeNonWordAndStripSpace(nonpartitionedExpected);
String nonpartitionedResults = removeNonWordAndStripSpace(nonpartitionedTableCR.getResult().toString());
assertEquals(nonpartitionedExpected, nonpartitionedResults);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;

Expand Down Expand Up @@ -60,6 +61,8 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;

import static org.apache.hudi.common.fs.StorageSchemes.HDFS;

/**
* HoodieWrapperFileSystem wraps the default file system. It holds state about the open streams in the file system to
* support getting the written size to each of the open streams.
Expand All @@ -68,6 +71,8 @@ public class HoodieWrapperFileSystem extends FileSystem {

public static final String HOODIE_SCHEME_PREFIX = "hoodie-";

private static final String TMP_PATH_POSTFIX = ".tmp";

protected enum MetricName {
create, rename, delete, listStatus, mkdirs, getFileStatus, globStatus, listFiles, read, write
}
Expand Down Expand Up @@ -986,6 +991,65 @@ public long getBytesWritten(Path file) {
file.toString() + " does not have a open stream. Cannot get the bytes written on the stream");
}

protected boolean needCreateTempFile() {
return HDFS.getScheme().equals(fileSystem.getScheme());
}

/**
* Creates a new file with overwrite set to false. This ensures files are created
* only once and never rewritten, also, here we take care if the content is not
* empty, will first write the content to a temp file if {needCreateTempFile} is
* true, and then rename it back after the content is written.
*
* @param fullPath File Path
* @param content Content to be stored
*/
public void createImmutableFileInPath(Path fullPath, Option<byte[]> content)
throws HoodieIOException {
FSDataOutputStream fsout = null;
Path tmpPath = null;

boolean needTempFile = needCreateTempFile();

try {
if (!content.isPresent()) {
fsout = fileSystem.create(fullPath, false);
}

if (content.isPresent() && needTempFile) {
Path parent = fullPath.getParent();
tmpPath = new Path(parent, fullPath.getName() + TMP_PATH_POSTFIX);
fsout = fileSystem.create(tmpPath, false);
fsout.write(content.get());
}

if (content.isPresent() && !needTempFile) {
fsout = fileSystem.create(fullPath, false);
fsout.write(content.get());
}
} catch (IOException e) {
String errorMsg = "Failed to create file" + (tmpPath != null ? tmpPath : fullPath);
throw new HoodieIOException(errorMsg, e);
} finally {
try {
if (null != fsout) {
fsout.close();
}
} catch (IOException e) {
String errorMsg = "Failed to close file" + (needTempFile ? tmpPath : fullPath);
throw new HoodieIOException(errorMsg, e);
}

try {
if (null != tmpPath) {
fileSystem.rename(tmpPath, fullPath);
}
} catch (IOException e) {
throw new HoodieIOException("Failed to rename " + tmpPath + " to the target " + fullPath, e);
}
}
}

public FileSystem getFileSystem() {
return fileSystem;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.hudi.exception.HoodieIOException;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
Expand Down Expand Up @@ -539,7 +538,7 @@ protected void transitionState(HoodieInstant fromInstant, HoodieInstant toInstan
if (allowRedundantTransitions) {
FileIOUtils.createFileInPath(metaClient.getFs(), getInstantFileNamePath(toInstant.getFileName()), data);
} else {
createImmutableFileInPath(getInstantFileNamePath(toInstant.getFileName()), data);
metaClient.getFs().createImmutableFileInPath(getInstantFileNamePath(toInstant.getFileName()), data);
}
LOG.info("Create new file for toInstant ?" + getInstantFileNamePath(toInstant.getFileName()));
}
Expand Down Expand Up @@ -706,33 +705,7 @@ protected void createFileInMetaPath(String filename, Option<byte[]> content, boo
if (allowOverwrite || metaClient.getTimelineLayoutVersion().isNullVersion()) {
FileIOUtils.createFileInPath(metaClient.getFs(), fullPath, content);
} else {
createImmutableFileInPath(fullPath, content);
}
}

/**
* Creates a new file in timeline with overwrite set to false. This ensures
* files are created only once and never rewritten
* @param fullPath File Path
* @param content Content to be stored
*/
private void createImmutableFileInPath(Path fullPath, Option<byte[]> content) {
FSDataOutputStream fsout = null;
try {
fsout = metaClient.getFs().create(fullPath, false);
if (content.isPresent()) {
fsout.write(content.get());
}
} catch (IOException e) {
throw new HoodieIOException("Failed to create file " + fullPath, e);
} finally {
try {
if (null != fsout) {
fsout.close();
}
} catch (IOException e) {
throw new HoodieIOException("Failed to close file " + fullPath, e);
}
metaClient.getFs().createImmutableFileInPath(fullPath, content);
}
}

Expand Down
Loading

0 comments on commit 96817f1

Please sign in to comment.