Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33059] Support transparent compression for file-connector for all file input formats #23443

Merged

Conversation

echauchot
Copy link
Contributor

What is the purpose of the change

Support transparent compression for file-connector for all file input formats.

Brief change log

  • Force reading the whole file split (like it is done for binary input formats) on compressed (unsplittable) files
  • add FileInputFormatTest#testFileInputFormatWithCompressionFromFileSource
  • generalize FileInputFormatTest#testFileInputFormatWithCompression to more that deflate format

Verifying this change

FileInputFormatTest#testFileInputFormatWithCompressionFromFileSource

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? website

@echauchot echauchot changed the title [Flink-33059] Support transparent compression for file-connector for all file input formats [FLINK*-33059] Support transparent compression for file-connector for all file input formats Sep 20, 2023
@echauchot echauchot changed the title [FLINK*-33059] Support transparent compression for file-connector for all file input formats [FLINK-33059] Support transparent compression for file-connector for all file input formats Sep 20, 2023
@flinkbot
Copy link
Collaborator

flinkbot commented Sep 20, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@echauchot echauchot assigned fhueske and rmetzger and unassigned fhueske Sep 20, 2023
@echauchot echauchot force-pushed the FLINK-33059-compression-fileInputFormat branch from f52b5a0 to 6283307 Compare September 25, 2023 07:57
@echauchot
Copy link
Contributor Author

Hi @rmetzger, I saw you authored parts of this code, can you please do a review or point me to another reviewer ?

@echauchot
Copy link
Contributor Author

R: @xintongsong I see your name in that code history, would you have time to take a look ?

@echauchot
Copy link
Contributor Author

@tzulitai you offered help for reviewing, don't hesitate to ping me on this when you have time.

Copy link
Contributor

@ferenc-csaky ferenc-csaky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The added logic makes sense IMO.

Added 2 comments, but in general I would consider separating the whole INFLATER_INPUT_STREAM_FACTORIES into a new class as there are a couple functions that uses it and seems quite detachable from FlieInputFormats.

After a quick peek I think something like the following could work:

public class InflaterInputStreamFactories {

  public static void register(String fileExt, InflaterInputStreamFactory<?> factory) { ... }

  public static InflaterInputStreamFactory<?> get(Path path) { ... }

  private static InflaterInputStreamFactory<?> get(String fileExt) { ... }

  @VisibleForTesting
  public static Set<String> getSupportedCompressionFormats() { ... }
}

Also, ConcurrentHashMap can be utilized insead of the synchronized block, but other than that the current logic could be moved as is now.

This probably goes beyond the current PR, but I think it worth to note it. WDYT?

@@ -157,6 +157,10 @@ protected static InflaterInputStreamFactory<?> getInflaterInputStreamFactory(
}
}

public static Set<String> getSupportedCompressionFormats() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd mark this @VisibleForTesting, because only tests use this function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 thx for pointing out

@@ -136,6 +138,26 @@ public static String createTempFileDirExtension(
return f.toURI().toString();
}

public static String createTempTextFileDirForAllCompressionFormats(File tempDir)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think instead of bringing the specific FileInputFormat into this general utility, it would be cleaner to pass Set<String> extensions as a parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, better to be more generic here, thx !

@echauchot
Copy link
Contributor Author

The added logic makes sense IMO.

Added 2 comments, but in general I would consider separating the whole INFLATER_INPUT_STREAM_FACTORIES into a new class as there are a couple functions that uses it and seems quite detachable from FlieInputFormats.

After a quick peek I think something like the following could work:

public class InflaterInputStreamFactories {

  public static void register(String fileExt, InflaterInputStreamFactory<?> factory) { ... }

  public static InflaterInputStreamFactory<?> get(Path path) { ... }

  private static InflaterInputStreamFactory<?> get(String fileExt) { ... }

  @VisibleForTesting
  public static Set<String> getSupportedCompressionFormats() { ... }
}

Also, ConcurrentHashMap can be utilized insead of the synchronized block, but other than that the current logic could be moved as is now.

This probably goes beyond the current PR, but I think it worth to note it. WDYT?

I agree with the ConcurrentHashMap suggestion. Regarding creating a class just for wrapping a map that is used only in the FileInputFormat it seems overkill to me. An anyway it is indeed outside of the scope of this filesize-fix PR.

@echauchot
Copy link
Contributor Author

@ferenc-csaky thanks for reviewing this PR ! I have addressed your comments, do I have your LGTM when the tests pass ?

@echauchot echauchot force-pushed the FLINK-33059-compression-fileInputFormat branch from ec89f4e to a0a6a7a Compare November 8, 2023 16:45
Copy link
Contributor

@ferenc-csaky ferenc-csaky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, if tests pass! 👍

@echauchot echauchot force-pushed the FLINK-33059-compression-fileInputFormat branch from a0a6a7a to 6f0b256 Compare November 9, 2023 10:01
…on't use FileInputFormat#createSplits. If input files are compressed, ensure that the size of the split is not the compressed file size and that the compression decorator is called.
@echauchot echauchot force-pushed the FLINK-33059-compression-fileInputFormat branch from 6f0b256 to 3ad7e33 Compare November 9, 2023 14:08
@ruslandanilin
Copy link

Thank you @echauchot !

@echauchot
Copy link
Contributor Author

Thank you @echauchot !

My pleasure ! Merging

@echauchot echauchot merged commit 4ccb2f6 into apache:master Nov 10, 2023
@echauchot echauchot deleted the FLINK-33059-compression-fileInputFormat branch November 10, 2023 08:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants