Skip to content

save_main_session not working in DoFn.setup() in Dataflow with Python #20248

@damccorm

Description

@damccorm

I have a dataflow pipeline that calls the DLP API in GCP. The relevant pieces of my code are:


from google.cloud.dlp import DlpServiceClient
def run(argv):
  """Main entry point; defines and runs
the pipeline."""
  opts = HashPipelineOptions()
  opts.view_as(SetupOptions).save_main_session = True

 std_opts = opts.view_as(StandardOptions)
  std_opts.streaming = True
...
class DlpFindingDoFn(beam.DoFn):
"""Fetch
DLP Findings as a PCollection"""
  def __init__(self, project, runner):
    beam.DoFn.__init__(self)

   self.project = project
    self.runner = runner
    self.inspect_config = { 
      'info_types'
: [{'name': 'US_SOCIAL_SECURITY_NUMBER'}], 
      'min_likelihood': 'VERY_UNLIKELY', 
      'include_quote'
: True # We need the output to match against the KV Store 
    } 
  def setup(self):
    self.dlp_client
= DlpServiceClient()
  def process(self, element):
    # TODO: Remove when version 2.22.0 is released
BEAM-7885
    if self.runner == 'DirectRunner':
      self.setup()
    # Convert the project id into
a full resource id.
    parent = self.dlp_client.project_path(self.project)
    filename, chunk =
element
    # Call the API.
    response = self.dlp_client.inspect_content(parent, self.inspect_config,
{'value': chunk})
    if response.result.findings:
      for f in response.result.findings:
    
   yield (filename, f.quote)

 

 

This runs just fine locally, but when I run it in Dataflow I get this NameError because the class DlpServiceClient cannot be found.


2020-05-29 06:29:19.799 PDTError message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException:
Error received from SDK harness for instruction -2659: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 368, in get processor = self.cached_bundle_processors[bundle_descriptor_id].pop() IndexError: pop
from empty list During handling of the above exception, another exception occurred: Traceback (most
recent call last): File "apache_beam/runners/common.py", line 1004, in apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
File "apache_beam/runners/common.py", line 488, in apache_beam.runners.common.DoFnInvoker.invoke_setup
File "hashpipeline.py", line 108, in setup NameError: name 'DlpServiceClient' is not defined During
handling of the above exception, another exception occurred: Traceback (most recent call last): File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 245, in _execute
response = task() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 302, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 471, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 500, in process_bundle instruction_id, request.process_bundle_descriptor_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 374, in get self.data_channel_factory) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 843, in __init__ op.setup() File "apache_beam/runners/worker/operations.py", line 611, in apache_beam.runners.worker.operations.DoOperation.setup
File "apache_beam/runners/worker/operations.py", line 660, in apache_beam.runners.worker.operations.DoOperation.setup
File "apache_beam/runners/common.py", line 1010, in apache_beam.runners.common.DoFnRunner.setup File
"apache_beam/runners/common.py", line 1006, in apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
File "apache_beam/runners/common.py", line 1045, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1004, in apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
File "apache_beam/runners/common.py", line 488, in apache_beam.runners.common.DoFnInvoker.invoke_setup
File "hashpipeline.py", line 108, in setup NameError: name 'DlpServiceClient' is not defined [while
running 'generatedPtransform-2651'] java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1363)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:153)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1086)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error received from SDK
harness for instruction -2659: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 368, in get processor = self.cached_bundle_processors[bundle_descriptor_id].pop() IndexError: pop
from empty list During handling of the above exception, another exception occurred: Traceback (most
recent call last): File "apache_beam/runners/common.py", line 1004, in apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
File "apache_beam/runners/common.py", line 488, in apache_beam.runners.common.DoFnInvoker.invoke_setup
File "hashpipeline.py", line 108, in setup NameError: name 'DlpServiceClient' is not defined During
handling of the above exception, another exception occurred: Traceback (most recent call last): File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 245, in _execute
response = task() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 302, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 471, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 500, in process_bundle instruction_id, request.process_bundle_descriptor_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 374, in get self.data_channel_factory) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 843, in __init__ op.setup() File "apache_beam/runners/worker/operations.py", line 611, in apache_beam.runners.worker.operations.DoOperation.setup
File "apache_beam/runners/worker/operations.py", line 660, in apache_beam.runners.worker.operations.DoOperation.setup
File "apache_beam/runners/common.py", line 1010, in apache_beam.runners.common.DoFnRunner.setup File
"apache_beam/runners/common.py", line 1006, in apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
File "apache_beam/runners/common.py", line 1045, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1004, in apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
File "apache_beam/runners/common.py", line 488, in apache_beam.runners.common.DoFnInvoker.invoke_setup
File "hashpipeline.py", line 108, in setup NameError: name 'DlpServiceClient' is not defined [while
running 'generatedPtransform-2651'] org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:178)
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:158)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

However, when I move the import line inside the setup function, everything works fine.

Imported from Jira BEAM-10150. Original Jira may contain additional context.
Reported by: onetwopunch.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions