Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python Direct Runner doesn't support both streaming & non streaming sources #21103

Open
damccorm opened this issue Jun 4, 2022 · 10 comments
Open

Comments

@damccorm
Copy link
Contributor

damccorm commented Jun 4, 2022

Please see Stack Overflow discussion:

https://stackoverflow.com/questions/68125864/transform-node-appliedptransform-was-not-replaced-as-expected-error-with-the-dir

When I create a GCS source & a Pub Source and try to flatten both, there is an error because of some incompatible transformation done by the direct runner.

Code example:


gcsEventsColl = p | "Read from GCS" >> beam.io.ReadFromText("gs://sample_events_for_beam/*.log") \

                 | 'convert to dict' >> beam.Map(lambda x: json.loads(x))
liveEventsColl = p | "Read
from Pubsub" >> beam.io.ReadFromPubSub(topic="projects/axxxx/topics/input_topic") \
              
    | 'convert to dict2' >> beam.Map(lambda x: json.loads(x))


input_rec = (gcsEventsColl, liveEventsColl)
| 'flatten' >> beam.Flatten()

Error:


File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 564, in run
    return self.runner.run_pipeline(self, self._options)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
line 529, in run_pipeline
    pipeline.replace_all(_get_transform_overrides(options))
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 504, in replace_all
    self._check_replacement(override)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 478, in _check_replacement
    self.visit(ReplacementValidator())
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 611, in visit
    self._root_transform().visit(visitor, self, visited)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 1195, in visit
    part.visit(visitor, pipeline, visited)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 1195, in visit
    part.visit(visitor, pipeline, visited)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 1195, in visit
    part.visit(visitor, pipeline, visited)   [Previous line repeated 4 more times]

  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 1198, in visit
    visitor.visit_transform(self)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 476, in visit_transform
    transform_node) RuntimeError: Transform node AppliedPTransform(Read
from GCS/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey,

   _GroupByKeyOnly) was not replaced as expected.

The direct runner corrupts the pipeline when it rewrites the transforms.

 

Imported from Jira BEAM-12586. Original Jira may contain additional context.
Reported by: rodriguezc.

@jamesandreou
Copy link

Hello. We are currently experiencing this issue as well trying to use beam.Flatten() on a historical Pcol from bigquery and a streaming Pcol from pub/sub.

Has anyone found a temporary workaround?

@tvalentyn
Copy link
Contributor

@jamesandreou would an in-process Flink runner work for you?

# (in a separate terminal)
docker run --net=host apache/beam_flink1.11_job_server:latest

python -m your_pipeline --runner PortableRunner  --job_endpoint="localhost:8099" --environment_type="LOOPBACK"  --streaming

@krishnap
Copy link

krishnap commented Aug 2, 2022

any update on this?

@tvalentyn
Copy link
Contributor

I don't think there has been significant work on Python streaming direct runner recently.

@ptrmcrthr
Copy link

We are running into this issue trying to implement a slowly changing side input as seen here: https://beam.apache.org/documentation/patterns/side-inputs/

Maybe a note on that page saying it's not working with DirectRunner? Unfortunately my pipeline is not working with Flink runner

@tvalentyn
Copy link
Contributor

@damccorm is working on a fix for PeriodicImpulse transform that may help with that pattern. Not sure if it will work with DirectRunner though as it has other limitations.

@tvalentyn
Copy link
Contributor

@BjornPrime - when you will document direct runner streaming limitations, incorporate #21103 (comment)

@precabal
Copy link

any update on this?

@paulleroyza
Copy link

paulleroyza commented Aug 19, 2023

This is also affecting my pipeline, snippet below:

  with beam.Pipeline(argv=pipeline_args) as pipeline:
    send_data = (pipeline | "Read Parquet" >> beam.io.ReadFromParquet(known_args.source)
                          | "Write to PubSub" >> beam.io.WriteToPubSub(topic=known_args.topic)
                )

@franklin113
Copy link

Are there any workarounds for this? Using PeriodicImpulse for updating side inputs in the DirectRunner throws this error in my streaming pipeline.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants