Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Run com2ann on Python SDK #17109

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
288 changes: 125 additions & 163 deletions sdks/python/apache_beam/coders/coder_impl.py

Large diffs are not rendered by default.

279 changes: 108 additions & 171 deletions sdks/python/apache_beam/coders/coders.py

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion sdks/python/apache_beam/coders/observable_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

"""Tests for the Observable mixin class."""
# pytype: skip-file
from __future__ import annotations

import logging
import unittest
Expand All @@ -29,7 +30,7 @@
class ObservableMixinTest(unittest.TestCase):
observed_count = 0
observed_sum = 0
observed_keys = [] # type: List[Optional[str]]
observed_keys: List[Optional[str]] = []

def observer(self, value, key=None):
self.observed_count += 1
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/coders/row_coder.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

# pytype: skip-file
from __future__ import annotations

from apache_beam.coders import typecoders
from apache_beam.coders.coder_impl import LogicalTypeCoderImpl
Expand Down Expand Up @@ -107,8 +108,7 @@ def from_type_hint(cls, type_hint, registry):
return cls(schema)

@staticmethod
def from_payload(payload):
# type: (bytes) -> RowCoder
def from_payload(payload: bytes) -> RowCoder:
return RowCoder(proto_utils.parse_Bytes(payload, schema_pb2.Schema))

def __reduce__(self):
Expand Down
33 changes: 12 additions & 21 deletions sdks/python/apache_beam/coders/slow_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
For internal use only; no backwards-compatibility guarantees.
"""
# pytype: skip-file
from __future__ import annotations

import struct
from typing import List
Expand All @@ -30,11 +31,10 @@ class OutputStream(object):

A pure Python implementation of stream.OutputStream."""
def __init__(self):
self.data = [] # type: List[bytes]
self.data: List[bytes] = []
self.byte_count = 0

def write(self, b, nested=False):
# type: (bytes, bool) -> None
def write(self, b: bytes, nested: bool = False) -> None:
assert isinstance(b, bytes)
if nested:
self.write_var_int64(len(b))
Expand All @@ -45,8 +45,7 @@ def write_byte(self, val):
self.data.append(chr(val).encode('latin-1'))
self.byte_count += 1

def write_var_int64(self, v):
# type: (int) -> None
def write_var_int64(self, v: int) -> None:
if v < 0:
v += 1 << 64
if v <= 0:
Expand All @@ -72,16 +71,13 @@ def write_bigendian_int32(self, v):
def write_bigendian_double(self, v):
self.write(struct.pack('>d', v))

def get(self):
# type: () -> bytes
def get(self) -> bytes:
return b''.join(self.data)

def size(self):
# type: () -> int
def size(self) -> int:
return self.byte_count

def _clear(self):
# type: () -> None
def _clear(self) -> None:
self.data = []
self.byte_count = 0

Expand All @@ -95,8 +91,7 @@ def __init__(self):
super().__init__()
self.count = 0

def write(self, byte_array, nested=False):
# type: (bytes, bool) -> None
def write(self, byte_array: bytes, nested: bool = False) -> None:
blen = len(byte_array)
if nested:
self.write_var_int64(blen)
Expand All @@ -119,25 +114,21 @@ class InputStream(object):
"""For internal use only; no backwards-compatibility guarantees.

A pure Python implementation of stream.InputStream."""
def __init__(self, data):
# type: (bytes) -> None
def __init__(self, data: bytes) -> None:
self.data = data
self.pos = 0

def size(self):
return len(self.data) - self.pos

def read(self, size):
# type: (int) -> bytes
def read(self, size: int) -> bytes:
self.pos += size
return self.data[self.pos - size:self.pos]

def read_all(self, nested):
# type: (bool) -> bytes
def read_all(self, nested: bool) -> bytes:
return self.read(self.read_var_int64() if nested else self.size())

def read_byte(self):
# type: () -> int
def read_byte(self) -> int:
self.pos += 1
return self.data[self.pos - 1]

Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/coders/standard_coders_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""Unit tests for coders that must be consistent across all Beam SDKs.
"""
# pytype: skip-file
from __future__ import annotations

import json
import logging
Expand Down Expand Up @@ -275,7 +276,7 @@ def json_value_parser(self, coder_spec):
# Used when --fix is passed.

fix = False
to_fix = {} # type: Dict[Tuple[int, bytes], bytes]
to_fix: Dict[Tuple[int, bytes], bytes] = {}

@classmethod
def tearDownClass(cls):
Expand Down
21 changes: 11 additions & 10 deletions sdks/python/apache_beam/coders/typecoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def MakeXyzs(v):
"""

# pytype: skip-file
from __future__ import annotations
from typing import Any
from typing import Dict
from typing import Iterable
Expand All @@ -80,8 +81,8 @@ def MakeXyzs(v):
class CoderRegistry(object):
"""A coder registry for typehint/coder associations."""
def __init__(self, fallback_coder=None):
self._coders = {} # type: Dict[Any, Type[coders.Coder]]
self.custom_types = [] # type: List[Any]
self._coders: Dict[Any, Type[coders.Coder]] = {}
self.custom_types: List[Any] = []
self.register_standard_coders(fallback_coder)

def register_standard_coders(self, fallback_coder):
Expand All @@ -104,12 +105,14 @@ def register_standard_coders(self, fallback_coder):
def register_fallback_coder(self, fallback_coder):
self._fallback_coder = FirstOf([fallback_coder, self._fallback_coder])

def _register_coder_internal(self, typehint_type, typehint_coder_class):
# type: (Any, Type[coders.Coder]) -> None
def _register_coder_internal(
self, typehint_type: Any,
typehint_coder_class: Type[coders.Coder]) -> None:
self._coders[typehint_type] = typehint_coder_class

def register_coder(self, typehint_type, typehint_coder_class):
# type: (Any, Type[coders.Coder]) -> None
def register_coder(
self, typehint_type: Any,
typehint_coder_class: Type[coders.Coder]) -> None:
if not isinstance(typehint_coder_class, type):
raise TypeError(
'Coder registration requires a coder class object. '
Expand All @@ -122,8 +125,7 @@ def register_coder(self, typehint_type, typehint_coder_class):
typehint_type = str(typehint_type)
self._register_coder_internal(typehint_type, typehint_coder_class)

def get_coder(self, typehint):
# type: (Any) -> coders.Coder
def get_coder(self, typehint: Any) -> coders.Coder:
if typehint and typehint.__module__ == '__main__':
# See https://issues.apache.org/jira/browse/BEAM-14250
# TODO(robertwb): Remove once all runners are portable.
Expand Down Expand Up @@ -187,8 +189,7 @@ class FirstOf(object):
"""For internal use only; no backwards-compatibility guarantees.

A class used to get the first matching coder from a list of coders."""
def __init__(self, coders):
# type: (Iterable[Type[coders.Coder]]) -> None
def __init__(self, coders: Iterable[Type[coders.Coder]]) -> None:
self._coders = coders

def from_type_hint(self, typehint, registry):
Expand Down
39 changes: 17 additions & 22 deletions sdks/python/apache_beam/dataframe/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations
import inspect
import weakref
from typing import TYPE_CHECKING
from typing import Any
from typing import Dict
from typing import Optional
from typing import Tuple
from typing import Union

Expand All @@ -31,19 +32,13 @@
from apache_beam.dataframe import schemas
from apache_beam.dataframe import transforms

if TYPE_CHECKING:
# pylint: disable=ungrouped-imports
from typing import Optional


# TODO: Or should this be called as_dataframe?
def to_dataframe(
pcoll, # type: pvalue.PCollection
proxy=None, # type: Optional[pd.core.generic.NDFrame]
label=None, # type: Optional[str]
):
# type: (...) -> frame_base.DeferredFrame

pcoll: pvalue.PCollection,
proxy: Optional[pd.core.generic.NDFrame] = None,
label: Optional[str] = None,
) -> frame_base.DeferredFrame:
"""Converts a PCollection to a deferred dataframe-like object, which can
manipulated with pandas methods like `filter` and `groupby`.

Expand Down Expand Up @@ -80,10 +75,10 @@ def to_dataframe(
# Note that the pipeline (indirectly) holds references to the transforms which
# keeps both the PCollections and expressions alive. This ensures the
# expression's ids are never accidentally re-used.
TO_PCOLLECTION_CACHE = weakref.WeakValueDictionary(
) # type: weakref.WeakValueDictionary[str, pvalue.PCollection]
UNBATCHED_CACHE = weakref.WeakValueDictionary(
) # type: weakref.WeakValueDictionary[str, pvalue.PCollection]
TO_PCOLLECTION_CACHE: weakref.WeakValueDictionary[
str, pvalue.PCollection] = weakref.WeakValueDictionary()
UNBATCHED_CACHE: weakref.WeakValueDictionary[
str, pvalue.PCollection] = weakref.WeakValueDictionary()


def _make_unbatched_pcoll(
Expand All @@ -106,7 +101,7 @@ def _make_unbatched_pcoll(


def to_pcollection(
*dataframes, # type: Union[frame_base.DeferredFrame, pd.DataFrame, pd.Series]
*dataframes: Union[frame_base.DeferredFrame, pd.DataFrame, pd.Series],
label=None,
always_return_tuple=False,
yield_elements='schemas',
Expand Down Expand Up @@ -191,12 +186,12 @@ def extract_input(placeholder):
df for df in dataframes if df._expr._id not in TO_PCOLLECTION_CACHE
]
if len(new_dataframes):
new_results = {p: extract_input(p)
for p in placeholders
} | label >> transforms._DataframeExpressionsTransform({
ix: df._expr
for (ix, df) in enumerate(new_dataframes)
}) # type: Dict[Any, pvalue.PCollection]
new_results: Dict[Any, pvalue.PCollection] = {
p: extract_input(p)
for p in placeholders
} | label >> transforms._DataframeExpressionsTransform(
{ix: df._expr
for (ix, df) in enumerate(new_dataframes)})

TO_PCOLLECTION_CACHE.update(
{new_dataframes[ix]._expr._id: pc
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/dataframe/doctests.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
4. The comparison is then done on the sorted lines of the expected and actual
values.
"""
from __future__ import annotations

import collections
import contextlib
Expand Down Expand Up @@ -146,7 +147,7 @@ class _InMemoryResultRecorder(object):
"""

# Class-level value to survive pickling.
_ALL_RESULTS = {} # type: Dict[str, List[Any]]
_ALL_RESULTS: Dict[str, List[Any]] = {}

def __init__(self):
self._id = id(self)
Expand Down
34 changes: 16 additions & 18 deletions sdks/python/apache_beam/dataframe/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations
import contextlib
import random
import threading
Expand All @@ -36,12 +37,12 @@ class Session(object):
def __init__(self, bindings=None):
self._bindings = dict(bindings or {})

def evaluate(self, expr): # type: (Expression) -> Any
def evaluate(self, expr: Expression) -> Any:
if expr not in self._bindings:
self._bindings[expr] = expr.evaluate_at(self)
return self._bindings[expr]

def lookup(self, expr): # type: (Expression) -> Any
def lookup(self, expr: Expression) -> Any:
return self._bindings[expr]


Expand Down Expand Up @@ -251,9 +252,9 @@ def preserves_partition_by(self) -> partitionings.Partitioning:
class PlaceholderExpression(Expression):
"""An expression whose value must be explicitly bound in the session."""
def __init__(
self, # type: PlaceholderExpression
proxy, # type: T
reference=None, # type: Any
self: PlaceholderExpression,
proxy: T,
reference: Any = None,
):
"""Initialize a placeholder expression.

Expand Down Expand Up @@ -282,11 +283,7 @@ def preserves_partition_by(self):

class ConstantExpression(Expression):
"""An expression whose value is known at pipeline construction time."""
def __init__(
self, # type: ConstantExpression
value, # type: T
proxy=None # type: Optional[T]
):
def __init__(self: ConstantExpression, value: T, proxy: Optional[T] = None):
"""Initialize a constant expression.

Args:
Expand Down Expand Up @@ -319,14 +316,15 @@ def preserves_partition_by(self):
class ComputedExpression(Expression):
"""An expression whose value must be computed at pipeline execution time."""
def __init__(
self, # type: ComputedExpression
name, # type: str
func, # type: Callable[...,T]
args, # type: Iterable[Expression]
proxy=None, # type: Optional[T]
_id=None, # type: Optional[str]
requires_partition_by=partitionings.Index(), # type: partitionings.Partitioning
preserves_partition_by=partitionings.Singleton(), # type: partitionings.Partitioning
self: ComputedExpression,
name: str,
func: Callable[..., T],
args: Iterable[Expression],
proxy: Optional[T] = None,
_id: Optional[str] = None,
requires_partition_by: partitionings.Partitioning = partitionings.Index(),
preserves_partition_by: partitionings.Partitioning = partitionings.
Singleton(),
):
"""Initialize a computed expression.

Expand Down
Loading