Skip to content

Commit

Permalink
Updates to the python SDK to use the new import structure created by …
Browse files Browse the repository at this point in the history
…gen_protos
  • Loading branch information
thempatel committed Apr 2, 2022
1 parent 35d947c commit 23ce537
Show file tree
Hide file tree
Showing 23 changed files with 370 additions and 325 deletions.
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2272,14 +2272,14 @@ def serialize(side_inputs):
@PTransform.register_urn('beam:transform:write_to_big_query:v0', bytes)
def from_runner_api(unused_ptransform, payload, context):
from apache_beam.internal import pickler
from apache_beam.portability.api.beam_runner_api_pb2 import SideInput
from apache_beam.portability.api import beam_runner_api_pb2

config = pickler.loads(payload)

def deserialize(side_inputs):
deserialized_side_inputs = {}
for k, v in side_inputs.items():
side_input = SideInput()
side_input = beam_runner_api_pb2.SideInput()
side_input.ParseFromString(v)
deserialized_side_inputs[k] = side_input

Expand Down
21 changes: 0 additions & 21 deletions sdks/python/apache_beam/portability/api/__init__.py

This file was deleted.

43 changes: 24 additions & 19 deletions sdks/python/apache_beam/portability/common_urns.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,30 @@

# pytype: skip-file

from apache_beam.portability.api.beam_runner_api_pb2_urns import BeamConstants
from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardArtifacts
from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardCoders
from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardDisplayData
from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardEnvironments
from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardProtocols
from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardPTransforms
from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardRequirements
from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardResourceHints
from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardSideInputTypes
from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardUserStateTypes
from apache_beam.portability.api.external_transforms_pb2_urns import ExpansionMethods
from apache_beam.portability.api.metrics_pb2_urns import MonitoringInfo
from apache_beam.portability.api.metrics_pb2_urns import MonitoringInfoSpecs
from apache_beam.portability.api.metrics_pb2_urns import MonitoringInfoTypeUrns
from apache_beam.portability.api.standard_window_fns_pb2_urns import FixedWindowsPayload
from apache_beam.portability.api.standard_window_fns_pb2_urns import GlobalWindowsPayload
from apache_beam.portability.api.standard_window_fns_pb2_urns import SessionWindowsPayload
from apache_beam.portability.api.standard_window_fns_pb2_urns import SlidingWindowsPayload
from .api import beam_runner_api_pb2_urns
from .api import external_transforms_pb2_urns
from .api import metrics_pb2_urns
from .api import standard_window_fns_pb2_urns

BeamConstants = beam_runner_api_pb2_urns.BeamConstants
StandardArtifacts = beam_runner_api_pb2_urns.StandardArtifacts
StandardCoders = beam_runner_api_pb2_urns.StandardCoders
StandardDisplayData = beam_runner_api_pb2_urns.StandardDisplayData
StandardEnvironments = beam_runner_api_pb2_urns.StandardEnvironments
StandardProtocols = beam_runner_api_pb2_urns.StandardProtocols
StandardPTransforms = beam_runner_api_pb2_urns.StandardPTransforms
StandardRequirements = beam_runner_api_pb2_urns.StandardRequirements
StandardResourceHints = beam_runner_api_pb2_urns.StandardResourceHints
StandardSideInputTypes = beam_runner_api_pb2_urns.StandardSideInputTypes
StandardUserStateTypes = beam_runner_api_pb2_urns.StandardUserStateTypes
ExpansionMethods = external_transforms_pb2_urns.ExpansionMethods
MonitoringInfo = metrics_pb2_urns.MonitoringInfo
MonitoringInfoSpecs = metrics_pb2_urns.MonitoringInfoSpecs
MonitoringInfoTypeUrns = metrics_pb2_urns.MonitoringInfoTypeUrns
FixedWindowsPayload = standard_window_fns_pb2_urns.FixedWindowsPayload
GlobalWindowsPayload = standard_window_fns_pb2_urns.GlobalWindowsPayload
SessionWindowsPayload = standard_window_fns_pb2_urns.SessionWindowsPayload
SlidingWindowsPayload = standard_window_fns_pb2_urns.SlidingWindowsPayload

primitives = StandardPTransforms.Primitives
deprecated_primitives = StandardPTransforms.DeprecatedPrimitives
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@

import apache_beam as beam
from apache_beam import coders
from apache_beam.portability.api.beam_interactive_api_pb2 import TestStreamFileHeader
from apache_beam.portability.api.beam_interactive_api_pb2 import TestStreamFileRecord
from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
from apache_beam.portability.api import beam_interactive_api_pb2
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.interactive.cache_manager import CacheManager
from apache_beam.runners.interactive.cache_manager import SafeFastPrimitivesCoder
from apache_beam.runners.interactive.caching.cacheable import CacheKey
Expand Down Expand Up @@ -201,7 +200,10 @@ def _emit_from_file(self, fh, tail):
# The first line at pos = 0 is always the header. Read the line without
# the new line.
to_decode = line[:-1]
proto_cls = TestStreamFileHeader if pos == 0 else TestStreamFileRecord
if pos == 0:
proto_cls = beam_interactive_api_pb2.TestStreamFileHeader
else:
proto_cls = beam_interactive_api_pb2.TestStreamFileRecord
msg = self._try_parse_as(proto_cls, to_decode)
if msg:
yield msg
Expand Down Expand Up @@ -343,7 +345,9 @@ def write(self, values, *labels):
os.makedirs(directory)
with open(filepath, 'ab') as f:
for v in values:
if isinstance(v, (TestStreamFileHeader, TestStreamFileRecord)):
if isinstance(v,
(beam_interactive_api_pb2.TestStreamFileHeader,
beam_interactive_api_pb2.TestStreamFileRecord)):
val = v.SerializeToString()
else:
raise TypeError(
Expand Down Expand Up @@ -531,8 +535,8 @@ def _advance_processing_time(self, new_timestamp):
"""Advances the internal clock and returns an AdvanceProcessingTime event.
"""
advancy_by = new_timestamp.micros - self._monotonic_clock.micros
e = TestStreamPayload.Event(
processing_time_event=TestStreamPayload.Event.AdvanceProcessingTime(
advance_duration=advancy_by))
e = beam_runner_api_pb2.TestStreamPayload.Event(
processing_time_event=beam_runner_api_pb2.TestStreamPayload.Event.
AdvanceProcessingTime(advance_duration=advancy_by))
self._monotonic_clock = new_timestamp
return e
Loading

0 comments on commit 23ce537

Please sign in to comment.