Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-1139] Add no-arg constructor for UnboundedReadFromBoundedSource #1586

Closed
wants to merge 2 commits into from

Conversation

tgroh
Copy link
Member

@tgroh tgroh commented Dec 12, 2016

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • Make sure tests pass via mvn clean verify. (Even better, enable
    Travis-CI on your fork and ensure the whole test matrix passes).
  • Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

This allows Kryo to work with the type, currently required by
the Apex runner.

@tgroh
Copy link
Member Author

tgroh commented Dec 13, 2016

My current exception

java.lang.InstantiationError: org.apache.beam.sdk.io.UnboundedSource

	at org.apache.beam.sdk.io.UnboundedSourceConstructorAccess.newInstance(Unknown Source)
	at com.esotericsoftware.kryo.Kryo$1.newInstance(Kryo.java:1015)
	at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:626)
	at org.apache.beam.runners.core.UnboundedReadFromBoundedSourceTest.testAdapterKryoSerialization(UnboundedReadFromBoundedSourceTest.java:143)

@tgroh
Copy link
Member Author

tgroh commented Dec 13, 2016

I get that with a public or private no-arg constructor, with a final or non-final class, and with a final or non-final boundedSource field.

This allows Kryo to work with the type, currently required by
the Apex runner.
@tgroh
Copy link
Member Author

tgroh commented Dec 13, 2016

R: @tweise @lukecwik

The serialization test passes in the DirectRunner Module locally.

@kennknowles
Copy link
Member

kennknowles commented Dec 13, 2016

Confirmed the fix for just the error in question manually, since Jenkins is severely backlogged.

mvn -DskipTests clean install -P apex-runner -pl examples/java -am

mvn --errors failsafe:integration-test@apex-runner-integration-tests \
    -pl examples/java \
    -P apex-runner \
    -P jenkins-precommit \
    -D test=WordCountIT \
    -D beamTestPipelineOptions='[
      "--runner=org.apache.beam.runners.apex.TestApexRunner",
      "--project=...",
      "--tempRoot=..."
    ]'

(deliberately trying to sprinkle knowledge and record of these incantations here and there)

<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
<version>2.21</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please do not put versions in submodules -- top module only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed at all -- isn't it picked up from the fact that this depends on runner-core?

@davorbonaci
Copy link
Member

Can we investigate alternatives that don't involve kryo proliferation? I'd ask to please hold the PR in the meanwhile.

@kennknowles
Copy link
Member

@davorbonaci this is precommit breakage

<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
<version>2.21</version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is unfortunate to need a version here. Do put the version into dependencyManagement regardless.

The trouble, for anyone looking at this line, is that the Spark runner and Apex runner pu
ll in Kryo at different versions. We could choose a version for Beam just for common style, but it would require overriding in one or the other, or both.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There shoudn't be any Kryo dependency in runners core. If at all, then this test should be in the Apex runner.

@lukecwik
Copy link
Member

@davorbonaci There is currently no hard specification as to how objects need to be serialized. Kryo proliferation comes from runners relying on Kryo as their defacto serialization mechanism.

@dhalperi
Copy link
Contributor

But Spark, for example, serializes sources and coders using java serialization as is expected today by Beam.

@tweise
Copy link
Contributor

tweise commented Dec 13, 2016

I'm going to look at it tonight. I believe Java serialization can be used for this reference in the runner also.

@kennknowles
Copy link
Member

Thanks @tweise! Agree that we should hold off on a quick fix, then, if you think you can just avoid the whole issue. It will save us debating the relationship of Kryo and Beam :-)

@davorbonaci
Copy link
Member

davorbonaci commented Dec 13, 2016

@tgroh seemed to have reduced scope of the change. With that, LGTM. Good for self-merge.

@tweise
Copy link
Contributor

tweise commented Dec 13, 2016

We can potentially improve it later to. Generally, if the class is Java serializable, then use

@Bind(JavaSerializer.class)

in the Apex operator code.

@asfgit asfgit closed this in 91cc606 Dec 13, 2016
@asfbot
Copy link

asfbot commented Dec 13, 2016

@asfbot
Copy link

asfbot commented Dec 13, 2016

@asfbot
Copy link

asfbot commented Dec 13, 2016

@asfbot
Copy link

asfbot commented Dec 13, 2016

@tweise
Copy link
Contributor

tweise commented Dec 13, 2016

I'm looking at it, not sure what the issue is, the field is marked for Java serialization in master:

Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): org.apache.beam.sdk.io.CompressedSource
Serialization trace:
boundedSource (org.apache.beam.runners.core.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter)
source (org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator)
	at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1050)
  @Bind(JavaSerializer.class)
  private final UnboundedSource<OutputT, CheckpointMarkT> source;

There should be no Kryo exception.

@tweise
Copy link
Contributor

tweise commented Dec 13, 2016

I notice that the @Bind annotation that controls the serializer is only present in Kryo 2.24 (used by Apex), not in 2.21. Is there a dependency leak in the examples module?

@dhalperi
Copy link
Contributor

dhalperi commented Dec 13, 2016 via email

kennknowles added a commit to kennknowles/beam that referenced this pull request Dec 13, 2016
This commits merged the following commits:

47cc2dc Add Tests for Kryo Serialization of URFBS
3c2e550 Add no-arg constructor for UnboundedReadFromBoundedSource

Kryo support is, in fact, not needed for UnboundedReadFromBoundedSource,
but we have a dependency issue where the Apex runner's dependency on Kryo
was being overridden with an older version that does not support the
@Bind annotations.

All runners and poms are independently correct except the examples pom
which merges the dependencies from all.
kennknowles added a commit to kennknowles/beam that referenced this pull request Dec 13, 2016
The above commit was a merge of the following commits to master:

47cc2dc Add Tests for Kryo Serialization of URFBS
3c2e550 Add no-arg constructor for UnboundedReadFromBoundedSource

Kryo support is, in fact, not needed for UnboundedReadFromBoundedSource,
but we have a dependency issue where the Apex runner's dependency on Kryo
was being overridden with an older version that does not support the
@Bind annotations.

All runners and poms are independently correct except the examples pom
which merges the dependencies from all.
mizitch pushed a commit to mizitch/incubator-beam that referenced this pull request Dec 13, 2016
The above commit was a merge of the following commits to master:

47cc2dc Add Tests for Kryo Serialization of URFBS
3c2e550 Add no-arg constructor for UnboundedReadFromBoundedSource

Kryo support is, in fact, not needed for UnboundedReadFromBoundedSource,
but we have a dependency issue where the Apex runner's dependency on Kryo
was being overridden with an older version that does not support the
@Bind annotations.

All runners and poms are independently correct except the examples pom
which merges the dependencies from all.
asfgit pushed a commit that referenced this pull request Dec 21, 2016
Adjustments in gearpump-runner:

  [BEAM-79] Upgrade to beam-0.5.0-incubating-SNAPSHOT
  [BEAM-79] Update to latest Gearpump API

From master:

  Disable automatic archiving of Maven builds
  [BEAM-59] initial interfaces and classes of Beam FileSystem.
  Change counter name in TestDataflowRunner
  More escaping in Jenkins timestamp spec
  Add RunnableOnService test for Metrics
  Fix seed job fetch spec
  Show timestamps on log lines in Jenkins
  [BEAM-1165] Fix unexpected file creation when checking dependencies
  [BEAM-1178] Make naming of logger objects consistent
  [BEAM-716] Fix javadoc on with* methods [BEAM-959] Improve check preconditions in JmsIO
  [BEAM-716] Use AutoValue in JmsIO
  Fix grammar error (repeated for)
  Empty TestPipeline need not be run
  [BEAM-85, BEAM-298] Make TestPipeline a JUnit Rule checking proper usage
  Change counter name in TestDataflowRunner
  BigQueryIO: fix streaming write, typo in API
  [BEAM-853] Force streaming execution on batch pipelines for testing. Expose the adapted source.
  Use empty SideInputReader, fixes NPE in SimpleDoFnRunnerTest
  Test that SimpleDoFnRunner wraps exceptions in startBundle and finishBundle
  Add timer support to DoFnRunner(s)
  Make TimerSpec and StateSpec fields accessible
  View.asMap: minor javadoc fixes
  Revert "Move InMemoryTimerInternals to runners-core"
  Revert "Moves DoFnAdapters to runners-core"
  Revert "Removes ArgumentProvider.windowingInternals"
  Revert "Removes code for wrapping DoFn as an OldDoFn"
  checkstyle: missed newline in DistributionCell
  Make {Metric,Counter,Distribution}Cell public
  Add PTransformOverrideFactory to the Core SDK
  Move ActiveWindowSet and implementations to runners-core
  Update Dataflow worker to beam-master-20161216
  [BEAM-1108] Remove outdated language about experimental autoscaling
  [BEAM-450] Shade modules to separate paths
  [BEAM-362] Port runners to runners-core AggregatoryFactory
  Move InMemoryTimerInternals to runners-core
  Delete deprecated TimerCallback
  Remove deprecated methods of InMemoryTimerInternals
  Don't incorrectly log error in MetricsEnvironment
  Renames ParDo.getNewFn to getFn
  Moves DoFnAdapters to runners-core
  Removes unused code from NoOpOldDoFn
  Removes ArgumentProvider.windowingInternals
  Removes code for wrapping DoFn as an OldDoFn
  Removes OldDoFn from ParDo
  Pushes uses of OldDoFn deeper inside Flink runner
  Remove ParDo.of(OldDoFn) from Apex runner
  Converts all easy OldDoFns to DoFn
  [BEAM-1022] Add testing coverage for BigQuery streaming writes
  Fix mvn command args in Apex postcommit Jenkins job
  [BEAM-932] Enable findbugs validation (and fix existing issues)
  Fail to split in FileBasedSource if filePattern expands to empty.
  [BEAM-1154] Get side input from proper window in ReduceFn
  [BEAM-1153] GcsUtil: use non-batch API for single file size requests.
  Fix NPE in StatefulParDoEvaluatorFactoryTest mocking
  [BEAM-1033] Retry Bigquery Verifier when Query Fails
  Implement GetDefaultOutputCoder in DirectGroupByKey
  SimpleDoFnRunner observes window if SideInputReader is nonempty
  Better comments and cleanup
  Allow empty string value for ValueProvider types.
  starter: fix typo in pom.xml
  Revert "Allow stateful DoFn in DataflowRunner"
  Re-exclude UsesStatefulParDo tests for Dataflow
  Some minor changes and fixes for sorter module
  [BEAM-1149] Explode windows when fn uses side inputs
  Add Jenkins postcommit for RunnableOnService in Apex runner
  Update version from 0.5.0-SNAPSHOT to 0.5.0-incubating-SNAPSHOT
  Update Maven Archetype versions after cutting the release branch
  Move PerKeyCombineFnRunner to runners-core
  Update Dataflow worker to beam-master-20161212
  [maven-release-plugin] prepare for next development iteration
  [maven-release-plugin] prepare branch release-0.4.0-incubating
  Fix version of Kryo in examples/java jenkins-precommit profile
  Revert 91cc606 "This closes #1586": Kryo + UBRFBS
  [BEAM-909] improve starter archetype
  Fix JDom malformed comment in Apex runner.
  [BEAM-927] Fix findbugs and re-enable Maven plugin in JmsIO
  [BEAM-807] Replace OldDoFn with DoFn.
  [BEAM-757] Use DoFnRunner in the implementation of DoFn via FlatMapFunction.
  FileBasedSinkTest: fix tests in Windows OS by using IOChannelUtils.resolve().
  FileBasedSink: ignore exceptions when removing temp output files for issues in Windows OS.
  [BEAM-1142] Upgrade maven-invoker to address maven bug ARCHETYPE-488.
  Add Tests for Kryo Serialization of URFBS
  Add no-arg constructor for UnboundedReadFromBoundedSource
  Revise WindowedWordCount for runner and execution mode portability
  Factor out ShardedFile from FileChecksumMatcher
  Add IntervalWindow coder to the standard registry
  Stop expanding PValues in DirectRunner visitors
  Migrate AppliedPTransform to use AutoValue
  Enable and fix DirectRunnerTest case missing @test
  [BEAM-1130] SparkRunner ResumeFromCheckpointStreamingTest Failing.
  [BEAM-1133] Add maxNumRecords per micro-batch for Spark runner options.
  BigQueryIO.Write: support runtime schema and table
  Fix handling of null ValueProviders in DisplayData
  [BEAM-551] Fix handling of default for VP
  [BEAM-1120] Move some DataflowRunner configurations from code to properties
  [BEAM-551] Fix toString for FileBasedSource
  [BEAM-921] spark-runner: register sources and coders to serialize with java serializer
  [BEAM-551] Fix handling of TextIO.Sink
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants