New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-8471] Flink native job submission for portable pipelines #9872
[BEAM-8471] Flink native job submission for portable pipelines #9872
Conversation
R: @chadrik |
Thanks for the heads up. I don't have any input on this, but I do like staying in the loop on this subject! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @tweise. Looks good! A couple of minor comments and clarifications inline.
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
Outdated
Show resolved
Hide resolved
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientRunner.java
Outdated
Show resolved
Hide resolved
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientRunner.java
Outdated
Show resolved
Hide resolved
|
||
private void runDriverProgram() throws Exception { | ||
ProcessManager processManager = ProcessManager.create(); | ||
String executable = "bash"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may want to make this executable configurable, for non bash users. How about putting executable
and args
in the config with the current defaults?
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientRunner.java
Outdated
Show resolved
Hide resolved
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientRunner.java
Outdated
Show resolved
Hide resolved
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientRunner.java
Outdated
Show resolved
Hide resolved
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientRunner.java
Outdated
Show resolved
Hide resolved
private int jobPort = 0; // any free port | ||
|
||
public FlinkPortableClientRunner(String driverCmd) { | ||
this.driverCmd = driverCmd; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.driverCmd = driverCmd; | |
Preconditions.checkState(!driverCmd.contains(DRIVER_CMD_FLAGS), "Driver command must not contain "+ DRIVER_CMD_FLAGS); | |
this.driverCmd = driverCmd; |
*/ | ||
public class FlinkPortableClientRunner { | ||
private static final Logger LOG = LoggerFactory.getLogger(FlinkPortableClientRunner.class); | ||
private static final String DRIVER_CMD_FLAGS = "--job_endpoint=%s"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private static final String DRIVER_CMD_FLAGS = "--job_endpoint=%s"; | |
private static final String JOB_ENDPOINT_FLAG = "--job_endpoint"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This allows us to check for the args in the user command string. The %s
would have to be appended in the format string.
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java
Outdated
Show resolved
Hide resolved
* | ||
* <p>Finally Flink launches the job. | ||
*/ | ||
public class FlinkPortableClientEntryPoint { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mxm any preference regarding class name? I first had it as FlinkPortableClientRunner
but maybe "runner" is misleading. On the other hand, "entry point" is also redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about just FlinkPortableRunner
? The functionality is closest to the classic FlinkRunner
, as it allows for any portable pipeline to be submitted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had it as XYZRunner originally, but figured it will cause confusion. There are FlinkPipelineRunner
and PortablePipelineRunner
already (these are used within the job server).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could rename those 🤓
* | ||
* <p>Finally Flink launches the job. | ||
*/ | ||
public class FlinkPortableClientEntryPoint { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about just FlinkPortableRunner
? The functionality is closest to the classic FlinkRunner
, as it allows for any portable pipeline to be submitted.
this.driverCmd = driverCmd; | ||
} | ||
|
||
/** Main method to be called within the Flink OptimizerPlanEnvironment. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is obsolete, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, updated now.
|
||
@Ignore("not working yet") | ||
@Test | ||
public void testInheritIO() throws IOException, InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mxm I see the inherit working as the expected output is printed in the console. But short of starting a separate java process I don't see a way to capture it in the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left the test but skipped the assertion, maybe we can still find a clever way to do this.
3b5dce8
to
f89c1b9
Compare
…program. (#27) Upstream PR: apache#9872
Add a new Flink entry point (main method) that invokes the external SDK client entry point to generate the pipeline and submits the resulting Flink job like any other Flink native driver program would, via the optimizer plan environment ("[auto]").
Note that in this PR, the SDK client is assumed to be on the same host, which is the case when Flink and Python dependencies are in the same container image, for example. While this is something that can be solved at build time, the question from Robert made me realize that the dependency is almost identical to that between the runner and SDK on the execution side, which is abstracted via the environment concept. So we could, in the future, consider introducing a "client environment" or simply expand the existing environment. This would allow the SDK bits for both pipeline construction and execution to live in a side car container, separate from Flink.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.