Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-14254][table] Introduce FileSystemOutputFormat for batch #9864

Closed
wants to merge 2 commits into from

Conversation

JingsongLi
Copy link
Contributor

@JingsongLi JingsongLi commented Oct 9, 2019

What is the purpose of the change

Introduce FileSystemOutputFormat to support all table file system connector with partition support in batch mode.

Brief change log

FileSystemOutputFormat use PartitionWriter to write:

  • DynamicPartitionWriter: Dynamic partition writer to writing multiple partitions at the same time, it maybe consumes more memory.
  • GroupedPartitionWriter: for grouped dynamic partition inserting. It will create a new format when partition changed.
  • NonPartitionWriter: for non-partition-aware writer. It just use one format to write in a transaction.

FileSystemOutputFormat use FileCommitter to commit temporary files.

PartitionWriters and Committer support transaction, this is for streaming checkpoint support. For batch, it will just single transaction to start and end.

Verifying this change

Add FileSystemOutputFormatTest.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 9, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 7e10d61 (Wed Dec 04 15:13:39 UTC 2019)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 9, 2019

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build

@docete
Copy link
Contributor

docete commented Oct 25, 2019

LGTM. Thanks for your PR @JingsongLi .

@JingsongLi
Copy link
Contributor Author

@KurtYoung Can you take a look when you're free?

@JingsongLi
Copy link
Contributor Author

ping @KurtYoung ~

@KurtYoung
Copy link
Contributor

why putting this to flink-table-common?

@KurtYoung
Copy link
Contributor

It also doesn't feel right when you introducing 10+ new classes but only test one of them

@JingsongLi
Copy link
Contributor Author

why putting this to flink-table-common?

You mean we can put it to flink-connector-filesystem?
1.Now class in flink-connector-filesystem is BucketingSink, its all classed are deprecated.
2.If we add to filesystem module, filesystem format like csv, parquet, and hive need add the filesystem dependent.
I just have above slight concerns, but I am OK to move classed to flink-connector-filesystem too.

@JingsongLi
Copy link
Contributor Author

It also doesn't feel right when you introducing 10+ new classes but only test one of them

I have added some test in my local, I want to add tests after making sure the main logical is no such dispute. I will add tests ASAP.

@KurtYoung
Copy link
Contributor

We can try to find more appropriate modules for these but flink-table-common is definitely not one of them.

@JingsongLi
Copy link
Contributor Author

We can try to find more appropriate modules for these but flink-table-common is definitely not one of them.

I have moved to blink planner to let them be internal implementation.

@JingsongLi
Copy link
Contributor Author

It also doesn't feel right when you introducing 10+ new classes but only test one of them

Hi @KurtYoung , Added tests.

Copy link
Contributor

@KurtYoung KurtYoung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed some of the abstractions you brought up and left some comments

@KurtYoung
Copy link
Contributor

BTW, this could move to flink-table-runtime-blink?

/**
* Path generator to generate new path to write and prepare task temporary directory.
*/
final class PathGenerator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to be an inner class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we expose it to writers, it must be public.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean why it's a class inside FileCommitter? The class name seems not tight to FileCommitter IMO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will let it be an independent interface. FileCommitter can be interface too.

@JingsongLi
Copy link
Contributor Author

Hi @KurtYoung , I refactored the codes, and integrated hive to FileSystemOutputFormat. Hope you can take a look.
CC: @lirui-apache

@JingsongLi JingsongLi force-pushed the batchFile branch 2 times, most recently from 2c09900 to bc77f17 Compare November 6, 2019 09:52
@JingsongLi
Copy link
Contributor Author

Split responsibilities of FileSystemCommitter, added comments to each role.

/**
* Hive {@link FileSystemFactory}, hive need use job conf to create file system.
*/
public class HiveFileSystemFactory implements FileSystemFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest rename this to HadoopFileSystemFactory since there's nothing specific about Hive here.

* to remote, so we should not create too frequently.
*/
@Internal
public interface MetaStoreFactory extends Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to TableMetaStoreFactory?

/**
* Create a {@link TableMetaStore}.
*/
TableMetaStore createTableMetaStore() throws Exception;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A TableMetaStore should be created for a specific table. So I think it's more natural if this API accepts a table path -- DB name and table name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TableMetaStoreFactory already specify DB name and table name, we don't want to let invoker to get DB name and table name every time and every where, this is meaningless, and where factory exists is just for a single table.

public Optional<Path> getPartition(
LinkedHashMap<String, String> partSpec) throws Exception {
try {
return Optional.of(new Path(client.getPartition(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if table is not partitioned?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will invoke PartitionLoader.loadNonPartition, never reach here.

Partition partition = HiveTableUtil.createHivePartition(database, tableName,
new ArrayList<>(partSpec.values()), newSd, new HashMap<>());
partition.setValues(new ArrayList<>(partSpec.values()));
client.add_partition(partition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we have to handle cases when table is not partitioned or the partition already exists?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two interface are just wrap client.getPartition and client.add_partition, should not exist other logical.

* A factory to create file systems.
*/
@Internal
public interface FileSystemFactory extends Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use org.apache.flink.core.fs.FileSystemFactory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like to introduce getScheme in FileSystemFactory. And it is not serializable.

/**
* Utils for file system.
*/
public class FileSystemUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More like PartitionPathUtils to me

* @return An escaped path name.
*/
private static String escapePathName(String path) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless blank line

*/
private static String escapePathName(String path) {

// __DEFAULT_NULL__ is the system default value for null and empty string.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's DEFAULT_NULL?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong comment, I will remove it.

* @param partitionSpec The partition spec.
* @return An escaped, valid partition name.
*/
public static String generatePartName(LinkedHashMap<String, String> partitionSpec) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generatePartName -> generatePartitionPath?

* eg: /tmp/cp-1/task-0/p0=1/p1=2/fileName.
*/
@Internal
public class TempFileManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to PartitionTempFileManager?

/**
* Generate a new path with directories.
*/
public Path generateTempFile(String... directories) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createPartitionDir will be more accurate, change parameter name to partitions

TASK_DIR_PREFIX + taskNumber);
}

public Path getTaskTemporaryPath() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to expose this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just for committer to clean task temporary dir, but I think we can move this clean to FileManager.


private String newFileName() {
return String.format(
checkpointName(checkpointId) + "-" + taskName(taskNumber) + "-file-%d",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why duplicating these info? We already have cpId and taskId in parent path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally, these files will be moved to final directory, so with these information, we can reduce name conflicts.

}

private static long getCheckpointId(String fileName) {
return Long.parseLong(fileName.substring(3, fileName.length()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fileName.length() is useless


private transient boolean inited;

private transient HiveShim hiveShim;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this is just refactoring code. But HiveShim is now a Serializable. So actually we can simply hold a HiveShim instance and don't need the hiveVersion field.

new HiveMetaStoreFactory(jobConf, hiveVersion, dbName, tableName));
builder.setOverwrite(overwrite);
builder.setStaticPartitions(staticPartitionSpec);
builder.setTmpPath(new org.apache.flink.core.fs.Path(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requiring the caller to specify a temp path seems strange to me. IMO caller of the API should only care about what the final path should be and not how temp paths are generated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note temp file is come from table location, FileSystemOutputFormat have no idea to how to generate temp file. Actually, FileSystemOutputFormat just need know temp dir, it don't need know final path.
Now the generation of the temp directory is uncertain. You can see below hive codes have todo. I don't want bring this thing to FileSystemOutputFormat now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If temp path has to come from table location, e.g. as a sub-dir of table location, then the builder should ask for the table location and generate the temp path it needs. Otherwise, we should clearly define what kind of path we need in the builder contract. It's not good practice to define an API that takes an arbitrary path and implicitly rely on callers to pass something of a specific structure.

@@ -178,27 +171,6 @@ public void setStaticPartition(Map<String, String> partitionSpec) {
}
}

private void validatePartitionSpec() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we need this anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Planner already verified it. It should be verify by framework.

@Override
public void finalizeGlobal(int parallelism) throws IOException {
try {
committer.commitUpToCheckpoint(CHECKPOINT_ID);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, will this be invoked each time a checkpoint is done?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, finalizeGlobal is only called in JM once when a batch job finishes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah,
For batch mode, it is invoked only called in JM once.
For streaming mode, it is invoked by CheckpointListener.notifyCheckpointComplete. you can take a look to StreamingFileSink.notifyCheckpointComplete.

* 2.{@link #loadNonPartition}: just rename all files to final output path.
*/
@Internal
public class FileSystemLoader implements Closeable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to FileSystemPartitionLoader or just PartitionLoader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not just partition loader, it will load files without partitions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I am OK to PartitionLoader.

* @param <T> The type of the consumed records.
*/
@Internal
public class NonPartitionWriter<T> implements PartitionWriter<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please re-consider the naming. It's confusing that a NonPartitionWriter is a PartitionWriter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change name to SingleDirectoryWriter

@JingsongLi
Copy link
Contributor Author

@KurtYoung @lirui-apache Hope you take a look again.

@lirui-apache
Copy link
Contributor

Thanks @JingsongLi for the update. I suppose the purpose of introducing these abstractions is to support writing partitions to different external systems other than Hive. Can we have a summary about what a user/developer needs to implement in order to achieve that?

@JingsongLi
Copy link
Contributor Author

Thanks @JingsongLi for the update. I suppose the purpose of introducing these abstractions is to support writing partitions to different external systems other than Hive. Can we have a summary about what a user/developer needs to implement in order to achieve that?

Hi @lirui-apache , there is no plan to support other external systems at present or in the foreseeable future.
There are only two implementations:

  • Hive
  • Flink file system connector(Note: this is just one implementation, formats just need implement input/output formats).
    So, I don't think we need expose it to users. So I think code comments are better than documents.

And there is a summary, actually just need implement:

  • OutputFormatFactory
  • TableMetaStoreFactory
    You can take a look to FileSystemOutputFormat.Builder, only these two are necessary.

Copy link
Contributor

@lirui-apache lirui-apache left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @JingsongLi for the explanation. LGTM.

@KurtYoung
Copy link
Contributor

I will have a final pass

Copy link
Contributor

@KurtYoung KurtYoung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost LGTM, I only have some structure related comments.

return this;
}

public Builder<T> setPartitionComputer(PartitionComputer<T> computer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never used function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future we can have BaseRowPartitionComputer.

Object field = in.getField(index);
String partitionValue = field != null ? field.toString() : null;
if (partitionValue == null || "".equals(partitionValue)) {
partitionValue = defaultPartName;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defaultPartName looks like is actually defaultPartValue to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is from hive world, but I can change it to defaultPartValue.

if (computer == null) {
if (conversionClass == Row.class) {
//noinspection unchecked
computer = (PartitionComputer<T>) new RowPartitionComputer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just letting HiveTableSink set RowPartitionComputer? It looks quite hack here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, let's use setPartitionComputer.

PartitionTempFileManager manager,
PartitionComputer<T> computer) throws Exception {
this.computer = computer;
this.format = context.createNewOutputFormat(manager.getStaticPartSpecs().size() == 0 ?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can find a way to pass in the static partition specs directly without relying on the PartitionTempFileManager

* Util for get a {@link PartitionWriterFactory}.
*/
static <T> PartitionWriterFactory<T> get(
boolean dynamicPartition, boolean grouped) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provide partition columns and static partition specs could make us pass necessary information to corresponding writers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, static partition specs is enough.

private final int taskNumber;
private final long checkpointId;
private final Path taskTmpDir;
private final LinkedHashMap<String, String> staticParts;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is not necessary

@Override
public void open(int taskNumber, int numTasks) throws IOException {
try {
PartitionTempFileManager fileManager = committer.createTempFileManager(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this class can directly create PartitionTempFileManager, no need to go through committer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, let's completely separate task related things from committer.

@JingsongLi
Copy link
Contributor Author

Thanks @KurtYoung for your review, updated.

@KurtYoung KurtYoung closed this in 035a233 Dec 3, 2019
Li-Aihua pushed a commit to Li-Aihua/flink that referenced this pull request Jan 19, 2020
@JingsongLi JingsongLi deleted the batchFile branch April 26, 2020 05:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants