Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-10435] Add ValidatesRunner task for local_job_service and Java SDK harness #11792

Merged
merged 6 commits into from Jul 14, 2020

Conversation

kennknowles
Copy link
Member

@kennknowles kennknowles commented May 22, 2020

This adds a ValidatesRunner suite for the Java SDK against the local Python ULR.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • [n/a] Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

Copy link
Member Author

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got this to run and hit some errors, but debugging is pretty awful. I never did get the local_job_service to actually log everything to Gradle that it does to the console. To debug this, it is easiest to run a separate job service and point the build at it with -PlocalJobServicePortFile.

I got two different errors at different times, one of which looks like a Python URN being sent to the Java SDK harness and the other like a Java URN being resolved by the Python runner (?)

RuntimeError: java.lang.IllegalStateException: No factory registered for beam:transform:read_from_impulse_python:v1, known factories [beam:transform:window_into:v1, beam:runner:source:v1, beam:runner:sink:v1, beam:source:java:0.1, beam:transform:read:v1, beam:transform:combine_per_key_precombine:v1, beam:transform:combine_per_key_merge_accumulators:v1, beam:transform:combine_per_key_extract_outputs:v1, beam:transform:combine_per_key_convert_to_accumulators:v1, beam:transform:combine_grouped_values:v1, beam:transform:flatten:v1, beam:transform:pardo:v1, beam:transform:sdf_pair_with_restriction:v1, beam:transform:sdf_split_restriction:v1, beam:transform:sdf_split_and_size_restrictions:v1, beam:transform:sdf_process_elements:v1, beam:transform:sdf_process_sized_element_and_restrictions:v1, beam:transform:map_windows:v1, beam:transform:merge_windows:v1]
	at org.apache.beam.fn.harness.control.ProcessBundleHandler$UnknownPTransformRunnerFactory.createRunnerForPTransform(ProcessBundleHandler.java:792)
	at org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:236)
	at org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:198)
	at org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:491)
	at org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:275)
	at org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:552)
	at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:270)
	at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
	at org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Traceback (most recent call last):
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 801, in __bootstrap_inner
    self.run()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 754, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/runners/portability/local_job_service.py", line 280, in _run_job
    self._pipeline_proto)
  File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 183, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 331, in run_stages
    bundle_context_manager,
  File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 508, in _run_stage
    bundle_manager)
  File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 546, in _run_bundle
    data_input, data_output, input_timers, expected_timer_output)
  File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 930, in process_bundle
    timer_inputs)):
  File "/Users/klk/.virtualenvs/ulr-vr/lib/python2.7/site-packages/concurrent/futures/_base.py", line 641, in result_iterator
    yield fs.pop().result()
  File "/Users/klk/.virtualenvs/ulr-vr/lib/python2.7/site-packages/concurrent/futures/_base.py", line 462, in result
    return self.__get_result()
  File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/utils/thread_pool_executor.py", line 44, in run
    self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
  File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 926, in execute
    dry_run)
  File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 859, in process_bundle
    output.transform_id).append(output.data)
  File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py", line 617, in get_buffer
    pcollections[output_pcoll].windowing_strategy_id])
  File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/runners/pipeline_context.py", line 137, in __getitem__
    return self.get_by_id(id)
  File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/runners/pipeline_context.py", line 103, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/transforms/core.py", line 2396, in from_runner_api
    windowfn=WindowFn.from_runner_api(proto.window_fn, context),
  File "/Users/klk/GitHub/apache/beam/sdks/python/apache_beam/utils/urns.py", line 186, in from_runner_api
    parameter_type, constructor = cls._known_urns[fn_proto.urn]
KeyError: u'beam:window_fn:serialized_java:v1'

Any idea where I should start debugging for an inversion of Java vs Python?

@@ -39,7 +39,7 @@
public class ArtifactRetrievalService
extends ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceImplBase implements FnService {

public static final int DEFAULT_BUFFER_SIZE = 4 << 20; // 4 MB
public static final int DEFAULT_BUFFER_SIZE = 2 << 20; // 2 MB
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robertwb @lukecwik first thing I hit putting this together was exceeding message size limit

@@ -1,3 +1,13 @@
import groovy.json.JsonOutput

import java.nio.file.FileSystems
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO(me): remove these imports

I first went through the "normal" route of using all this stuff to watch for the pid file but it was verbose and had race conditions. No point. Just check and sleep, now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't have to watch, when the --background flag is set it waits for the service to be up before terminating.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, I'll remove that. I think in an early draft I made the gradle task not wait for the process to terminate (because the daemonized process was causing the hang, but that turned out to be a different configuration)

includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
}
filter {
includeTestsMatching 'ImpulseTest'
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO(me): remove this once we get past the sanity checking phase

runners/portability/java/build.gradle Show resolved Hide resolved
* @param options Properties which configure the runner.
* @return The newly created runner.
*/
public static TestUniversalRunner fromOptions(PipelineOptions options) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to add this, because TestPortableRunner couples "check that the job succeeds" logic with a bunch of other things having to do with launching an existing Java runner as a portable runner, not relevant to actual portable runner services.

proc.waitFor();
}

task virtualenv {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried registering outputs.dir virtualenvDir and inputs.dir pythonSdkDir but it did not result in incremental build.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the existing virtualenv tasks we have?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't use them because I was attempting to do more proper idempotent/cached gradle tasks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hey now I recall - applyPythonNature causes conflicting configurations. Really would like to move away from these nature things.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't want Beam to move to a build setup where each gradle file does its own thing because the fragmentation will hurt debugging build issues and slow down rolling out build changes that impact more then one project.

One example where we decided to split a common setup was between releasing java projects and releasing vendored projects which lead to fixes that weren't done in both places leading to bugs that lasted for months.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not suggesting moving each build.gradle to do their own thing. Beam Java modules and vendored libraries can share as much code as they like. They shouldn't share an entrypoint because they are different things.

In this case, the code suggested to be shared is coupled with creating a Beam Python module, which this is not. "Do Python things" is not an adequate or meaningful abstraction. Applying the vague blanket logic is a liability, even if it worked here, which it does not. It is likely that I can do some tweaks to applyPythonNature to "make it work", but that would be bad engineering.

  • Adding near-duplicate code when maybe there is an abstraction: tech debt
  • Adding a dependency on something that isn't ready/meant for it: tech debt

I interpret Robert's comment as an invitation to improve our build code into some kind of meaningful abstraction that can be shared without incurring yet more tech debt.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I will explore this invitation once the tests are running properly)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a dependency on something that is ready and is meant for it, but is not polished and named to make clear that it is a logical necessity: tech debt)

@@ -1,3 +1,13 @@
import groovy.json.JsonOutput

import java.nio.file.FileSystems
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't have to watch, when the --background flag is set it waits for the service to be up before terminating.

proc.waitFor();
}

task virtualenv {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the existing virtualenv tasks we have?

File pidFile = new File(localJobServicePidFile)
int totalSleep = 0
while (!pidFile.exists()) {
sleep(500)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't have to wait, once the above exec completes it should be there (or not). Does the above task error if the return code is non-zero?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. This code was left over from when I was struggling to get gradle to allow the thing to daemonize itself.

subprocess.Popen([
sys.executable,
'-m',
'apache_beam.runners.portability.local_job_service_main'
] + argv)
] + argv,
stderr=stderr_dest,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what happens here when this process exits (and possibly tries to close these files?).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't read the subprocess code, and the docs are vague. The special subprocess.STDOUT token indicates that the output should be "captured" into the same file handle. Comments at https://stackoverflow.com/questions/31980411/closing-files-from-subprocess-stdout imply that closing the file is the responsibility of this process. I did not run the experiments suggested there. I also did not try to refactor this code to allow a with statement.

@kennknowles
Copy link
Member Author

Once I get the suite running I can do some refactors to try to share with setupVirtualEnv and clear out the extraneous sleepwaiting and whatnot.

@kennknowles
Copy link
Member Author

Found a lot of exclusions before I started getting my disks filled by the build & local runner.

@robertwb you may be interested in the failures where it seems empty side inputs don't work

@kennknowles kennknowles force-pushed the portable-java-vr branch 2 times, most recently from c466855 to f156314 Compare July 9, 2020 21:36
@kennknowles kennknowles changed the title WIP: Add ValidatesRunner task for local_job_service and Java SDK harness Add ValidatesRunner task for local_job_service and Java SDK harness Jul 9, 2020
@kennknowles
Copy link
Member Author

PTAL. I have a final set of exclusions, categorized, with all other tests passing locally. The local run took about an hour. I have adjusted applyPythonNature to be compatible with applyJavaNature and reused the code despite my misgivings. I did not refactor the build support code.

@kennknowles
Copy link
Member Author

I have curated the commits into orthogonal changes, if you wish to focus your review.

@kennknowles kennknowles changed the title Add ValidatesRunner task for local_job_service and Java SDK harness [BEAM-10435] Add ValidatesRunner task for local_job_service and Java SDK harness Jul 9, 2020
@kennknowles kennknowles force-pushed the portable-java-vr branch 4 times, most recently from e7254e2 to da4acef Compare July 9, 2020 22:39
@kennknowles
Copy link
Member Author

https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/13653/ appears to be a failure in unrelated code. It does not look like something that would flake. Perhaps master got broken.

@kennknowles
Copy link
Member Author

Run Python PreCommit

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

"--stdout_file=${localJobServiceStdoutFile}",
"--pid_file=${localJobServicePidFile}",
"--port_file=${localJobServicePortFile}"
//
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These lines can be removed now, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

task ulrValidatesRunnerTests(type: Test) {
dependsOn ":sdks:java:container:docker"

if (!project.hasProperty("localJobServicePortFile")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very convenient.

"--runner=TestUniversalRunner",
"--experiments=beam_fn_api",
"--localJobServicePortFile=${localJobServicePortFile}"
])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider passing --defaultEnvironmentType=LOOPBACK. You can then remove the docker dependency as well. (Maybe we could run one test with docker, but all of them seems overkill and expensive.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rather specifically want the docker dependency, to have a test of the true Java SDK harness container without the complexity of a production runner. But that can be postcommit and if LOOPBACK is faster and easier to debug that's good for precommit. I'd like to leave as-is to avoid churning this PR, but will follow up and create a LOOPBACK version prior to creating any Jenkins job.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. I don't think it makes sense to run every test to verify the container/harness setup works (this is reminiscent of other threads) but definitely agree these choices can be postponed while we get this PR in.

excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
}
filter {
// There is not currently a category for excluding these _only_ in committed mode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you create JIRAs for these (at whatever granularity seems appropriate) and add them here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Also added a new Jira component runner-universal since I did not find one, in case there's a need to search for these.

result.waitUntilFinish(),
Matchers.is(PipelineResult.State.DONE));
return result;
} catch (IOException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'd make this more local (it's thrown only at Files.readAllBytes).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* A file containing the job service port, since Gradle needs to know this filename statically
* to provide it in Beam testing options.
*/
@Description("File containing local job service port.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logically, it would make sense to let this be optional (e.g. one could instead provide jobEndpoint directly). A point could be made that the testing infrastructure should be the one reading the file and setting jobEndpoint, rather than passing the file path as an option (but I don't know how much messier that'd make things).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this sort of thing is what took 90% of the time for this PR actually. Scraping around Gradle's docs and the internet for ways to insert that little bit of logic, because realizing it was sort of against the grain. Pipeline options are passed as a Java system property, and those are set up in the Gradle graph construction phase. More generally, there's not a Gradle graph execution-time slot for free-form code that also re-uses the Test task type. Perhaps they expect you to use inheritance and make a new Task type. Which I would rather not do ;_;

It would be fine to have two pipeline options, so that simple use could be simple.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have the plain jobEndpoint option, just make (re)setting it conditional on LocalJobServicePortFile being set.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


if not options.stdout_file:
raise RuntimeError('--stdout_file must be specified with --background')
stdout_dest = open(options.stdout_file, mode='w')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I mentioned this before, but is it an issue that these file descriptors might get closed on completion of this process?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I think my rebasing clobbered that thread. It is not an issue. Parent file descriptors are not closed. You can find some links I think on the PR front page it will still have the prior conversation.

(I won't rebase from here on out, until review is done)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem. My comment was from something like a mont ago. Thanks for the references.

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. This is great.

* A file containing the job service port, since Gradle needs to know this filename statically
* to provide it in Beam testing options.
*/
@Description("File containing local job service port.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have the plain jobEndpoint option, just make (re)setting it conditional on LocalJobServicePortFile being set.


if not options.stdout_file:
raise RuntimeError('--stdout_file must be specified with --background')
stdout_dest = open(options.stdout_file, mode='w')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem. My comment was from something like a mont ago. Thanks for the references.

@kennknowles
Copy link
Member Author

Flakes in this run:

And then I just introduced an actual compile error in my fixups.

@kennknowles
Copy link
Member Author

On the portable precommit, 4 gradle scans succeeded and then the Gradle daemon crashed.

@kennknowles
Copy link
Member Author

Run Portable_Python PreCommit

@kennknowles
Copy link
Member Author

(it could have been interrupted, I didn't dig too deep)

@kennknowles
Copy link
Member Author

Flakes this time in Java precommit:

@kennknowles
Copy link
Member Author

Run Java PreCommit

@kennknowles
Copy link
Member Author

OK this is known flake https://issues.apache.org/jira/browse/BEAM-10470 in JdbcDriverTest, totally unrelated to this and shouldn't even be running. At this point I feel comfortable merging this so things can move on despite flakes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants