Skip to content

Commit

Permalink
Merge pull request #6512: [BEAM-5443] Pipeline option defaults for po…
Browse files Browse the repository at this point in the history
…rtable runner.
  • Loading branch information
tweise committed Sep 29, 2018
2 parents bf69c25 + d3e6bab commit 766a1dc
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 10 deletions.
9 changes: 9 additions & 0 deletions sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from apache_beam import pvalue
from apache_beam.internal import pickler
from apache_beam.io.filesystems import FileSystems
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
Expand Down Expand Up @@ -152,6 +153,14 @@ def __init__(self, runner=None, options=None, argv=None):
raise ValueError(
'Pipeline has validations errors: \n' + '\n'.join(errors))

# set default experiments for portable runner
# (needs to occur prior to pipeline construction)
if self._options.view_as(StandardOptions).runner == 'PortableRunner':
experiments = (self._options.view_as(DebugOptions).experiments or [])
if not 'beam_fn_api' in experiments:
experiments.append('beam_fn_api')
self._options.view_as(DebugOptions).experiments = experiments

# Default runner to be used.
self.runner = runner
# Stack of transforms generated by nested apply() calls. The stack will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import apache_beam as beam
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.runners.portability import portable_runner
from apache_beam.runners.portability import portable_runner_test
Expand Down Expand Up @@ -66,7 +65,6 @@ def get_runner(cls):
def create_options(self):
options = super(FlinkRunnerTest, self).create_options()
options.view_as(DebugOptions).experiments = ['beam_fn_api']
options.view_as(SetupOptions).sdk_location = 'container'
if streaming:
options.view_as(StandardOptions).streaming = True
return options
Expand All @@ -80,6 +78,9 @@ def test_read(self):
def test_no_subtransform_composite(self):
raise unittest.SkipTest("BEAM-4781")

def test_pardo_state_only(self):
raise unittest.SkipTest("BEAM-2918 - User state not yet supported.")

# Inherits all other tests.

# Run the tests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,6 @@ def cross_product(elem, sides):
equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 'b')]))

def test_pardo_state_only(self):
p = self.create_pipeline()
if not isinstance(p.runner, fn_api_runner.FnApiRunner):
# test is inherited by Flink PVR, which does not support the feature yet
self.skipTest('User state not supported.')

index_state_spec = userstate.CombiningValueStateSpec(
'index', beam.coders.VarIntCoder(), sum)

Expand All @@ -265,7 +260,7 @@ def process(self, kv, index=beam.DoFn.StateParam(index_state_spec)):
('B', 'b', 2),
('B', 'b', 3)]

with p:
with self.create_pipeline() as p:
assert_that(p | beam.Create(inputs) | beam.ParDo(AddIndex()),
equal_to(expected))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from apache_beam import metrics
from apache_beam.internal import pickler
from apache_beam.options.pipeline_options import PortableOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_job_api_pb2
from apache_beam.portability.api import beam_job_api_pb2_grpc
Expand Down Expand Up @@ -104,6 +105,12 @@ def _create_environment(options):
def run_pipeline(self, pipeline):
portable_options = pipeline.options.view_as(PortableOptions)
job_endpoint = portable_options.job_endpoint

# TODO: https://issues.apache.org/jira/browse/BEAM-5525
# portable runner specific default
if pipeline.options.view_as(SetupOptions).sdk_location == 'default':
pipeline.options.view_as(SetupOptions).sdk_location = 'container'

if not job_endpoint:
docker = DockerizedJobServer()
job_endpoint = docker.start()
Expand Down
2 changes: 0 additions & 2 deletions sdks/python/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,7 @@ task portableWordCount(dependsOn: 'installGcpTest') {
def options = [
"--input=/etc/profile",
"--output=/tmp/py-wordcount-direct",
"--experiments=beam_fn_api",
"--runner=PortableRunner",
"--sdk_location=container",
]
if (project.hasProperty("streaming"))
options += ["--streaming"]
Expand Down

0 comments on commit 766a1dc

Please sign in to comment.