Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-9545] Support read a file multiple times in Flink DataStream #6130

Closed
wants to merge 1 commit into from
Closed

[FLINK-9545] Support read a file multiple times in Flink DataStream #6130

wants to merge 1 commit into from

Conversation

bowenli86
Copy link
Member

@bowenli86 bowenli86 commented Jun 7, 2018

What is the purpose of the change

Motivation: We have the requirements to read a bunch files, each file to read multiple times, to feed our streams

Specifically we need StreamExecutionEnvironment.readFile/readTextFile to be able to read a file for a specified N times, but currently it only supports reading file once.

We've implemented this internally. Would be good to get it back to the community version. This jira is to add support for the feature.

Brief change log

  • add a new processing mode as PROCESSING_N_TIMES
  • add additional parameter numTimes for StreamExecutionEnvironment.readFile/readTextFile

Verifying this change

This change is already covered by existing tests, such as (please describe tests).

Does this pull request potentially affect one of the following parts:

  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs / JavaDocs)

*/
public DataStreamSource<String> readTextFile(String filePath, String charsetName, int numTimes) {
Preconditions.checkNotNull(filePath, "The file path must not be null.");
Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be Preconditions.checkArgument(filePath.isEmpty(), "The file path must not be empty.");?
BTW, maybe we could use the Strings.isNullOrEmpty(filePath) to merge this two checks into one.

} catch (Exception e) {
throw new InvalidProgramException("The type returned by the input format could not be " +
"automatically determined. Please specify the TypeInformation of the produced type " +
"explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it better to not swallow the Exception here.


Preconditions.checkNotNull(inputFormat, "InputFormat must not be null.");
Preconditions.checkNotNull(filePath, "The file path must not be null.");
Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, looks like should be Preconditions.checkArgument().

FileProcessingMode watchType,
long interval,
TypeInformation<OUT> typeInformation,
int numTimes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could numTimes be negative? If not, we could use the @Nonnegative annotation, or check it in this function.

String.format("The path monitoring interval cannot be less than %d ms in %s mode.",
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL, FileProcessingMode.PROCESS_CONTINUOUSLY));
break;
default:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about raising an IllegalArgumentException here?

@aljoscha
Copy link
Contributor

aljoscha commented Jun 7, 2018

@bowenli86
Copy link
Member Author

@aljoscha

Motivation: We have the requirements to read a bunch files, each file to read multiple times, to feed our streams

Specifically we need StreamExecutionEnvironment.readFile/readTextFile to be able to read a file for a specified N times, but currently it only supports reading file once.

We've implemented this internally. Would be good to get it back to the community version. This jira is to add support for the feature.

@zentol
Copy link
Contributor

zentol commented Jun 7, 2018

You're effectively only explaining what this feature is, but not why it is actually needed. We have to gauge whether this feature is useful for other users as well before we decide to maintain it.

@kl0u
Copy link
Contributor

kl0u commented Jun 13, 2018

Hi @bowenli86 !

Why not having a flatmap after the readFile and for every incoming element you emit as many copies as you want?

Personally, I am not so fond of adding methods to the public APIs for such specific usecases.

@bowenli86
Copy link
Member Author

bowenli86 commented Jun 13, 2018

Not really. It's not about having n copies of data. One use case is file-fed stream pipeline usually runs very fast with inadequate metrics, users need to run it end-to-end for a longer time to gather stable metrics and tune all components in the pipeline.

I'll close it if community is not interested. @kl0u

@kl0u
Copy link
Contributor

kl0u commented Jun 15, 2018

Hi @bowenli86,
Me, @zentol and @aljoscha seem to have doubts about the utility of the feature.

So given this, and to have a clean JIRA and list of PRs we have to work on, I would suggest
to close the PR and the related issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants