Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][BEAM-2784] Run python 2 to 3 migration and fix resulting Python 2 errors #3772

5 changes: 3 additions & 2 deletions sdks/python/apache_beam/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@

if not (sys.version_info[0] == 2 and sys.version_info[1] == 7):
raise RuntimeError(
'The Apache Beam SDK for Python is supported only on Python 2.7. '
'It is not supported on Python ['+ str(sys.version_info) + '].')
'The Apache Beam SDK for Python is supported only on Python 2.7.'
'It is not supported on Python {0}.'
.format(sys.version))

# pylint: disable=wrong-import-position
import apache_beam.internal.pickler
Expand Down
74 changes: 46 additions & 28 deletions sdks/python/apache_beam/coders/coder_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,37 @@

For internal use only; no backwards-compatibility guarantees.
"""
from types import NoneType
from __future__ import absolute_import, division

import sys
from builtins import chr, object, range

from past.utils import old_div

from apache_beam.coders import observable
from apache_beam.utils.timestamp import Timestamp
from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import MIN_TIMESTAMP
from apache_beam.utils import windowed_value
from apache_beam.utils.timestamp import MAX_TIMESTAMP, MIN_TIMESTAMP, Timestamp

if sys.version_info[0] >= 3:
basestring = str
long = int
unicode = str


# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
from stream import InputStream as create_InputStream
from stream import OutputStream as create_OutputStream
from stream import ByteCountingOutputStream
from stream import get_varint_size
from .stream import InputStream as create_InputStream
from .stream import OutputStream as create_OutputStream
from .stream import ByteCountingOutputStream
from .stream import get_varint_size
globals()['create_InputStream'] = create_InputStream
globals()['create_OutputStream'] = create_OutputStream
globals()['ByteCountingOutputStream'] = ByteCountingOutputStream
except ImportError:
from slow_stream import InputStream as create_InputStream
from slow_stream import OutputStream as create_OutputStream
from slow_stream import ByteCountingOutputStream
from slow_stream import get_varint_size
from .slow_stream import InputStream as create_InputStream
from .slow_stream import OutputStream as create_OutputStream
from .slow_stream import ByteCountingOutputStream
from .slow_stream import get_varint_size
# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports


Expand Down Expand Up @@ -154,7 +163,8 @@ def encode_to_stream(self, value, stream, nested):
return stream.write(self._encoder(value), nested)

def decode_from_stream(self, stream, nested):
return self._decoder(stream.read_all(nested))
read_from_stream = stream.read_all(nested)
return self._decoder(read_from_stream)

def encode(self, value):
return self._encoder(value)
Expand Down Expand Up @@ -182,7 +192,7 @@ def __init__(self, coder, step_label):
self._step_label = step_label

def _check_safe(self, value):
if isinstance(value, (str, unicode, long, int, float)):
if isinstance(value, (str, basestring, bytes, int, long, float)):
pass
elif value is None:
pass
Expand Down Expand Up @@ -262,18 +272,18 @@ def get_estimated_size_and_observables(self, value, nested=False):

def encode_to_stream(self, value, stream, nested):
t = type(value)
if t is NoneType:
if value is None:
stream.write_byte(NONE_TYPE)
elif t is int:
stream.write_byte(INT_TYPE)
stream.write_var_int64(value)
elif t is float:
stream.write_byte(FLOAT_TYPE)
stream.write_bigendian_double(value)
elif t is str:
elif t is bytes:
stream.write_byte(STR_TYPE)
stream.write(value, nested)
elif t is unicode:
elif t is unicode or t is basestring:
unicode_value = value # for typing
stream.write_byte(UNICODE_TYPE)
stream.write(unicode_value.encode('utf-8'), nested)
Expand All @@ -287,7 +297,7 @@ def encode_to_stream(self, value, stream, nested):
dict_value = value # for typing
stream.write_byte(DICT_TYPE)
stream.write_var_int64(len(dict_value))
for k, v in dict_value.iteritems():
for k, v in dict_value.items():
self.encode_to_stream(k, stream, True)
self.encode_to_stream(v, stream, True)
elif t is bool:
Expand All @@ -306,14 +316,17 @@ def decode_from_stream(self, stream, nested):
elif t == FLOAT_TYPE:
return stream.read_bigendian_double()
elif t == STR_TYPE:
return stream.read_all(nested)
if sys.version_info[0] < 3:
return str(stream.read_all(nested))
else:
return stream.read_all(nested)
elif t == UNICODE_TYPE:
return stream.read_all(nested).decode('utf-8')
elif t == LIST_TYPE or t == TUPLE_TYPE or t == SET_TYPE:
vlen = stream.read_var_int64()
vlist = [self.decode_from_stream(stream, True) for _ in range(vlen)]
if t == LIST_TYPE:
return vlist
return list(vlist)
elif t == TUPLE_TYPE:
return tuple(vlist)
return set(vlist)
Expand All @@ -333,7 +346,7 @@ def decode_from_stream(self, stream, nested):
class BytesCoderImpl(CoderImpl):
"""For internal use only; no backwards-compatibility guarantees.

A coder for bytes/str objects."""
A coder for bytes/str objects. In Python3 this will return bytes not strs."""

def encode_to_stream(self, value, out, nested):
out.write(value, nested)
Expand All @@ -342,7 +355,11 @@ def decode_from_stream(self, in_stream, nested):
return in_stream.read_all(nested)

def encode(self, value):
assert isinstance(value, bytes), (value, type(value))
assert isinstance(value, (bytes, str))
if isinstance(value, bytes):
return value
elif isinstance(value, str):
return value.encode('latin-1')
return value

def decode(self, encoded):
Expand Down Expand Up @@ -377,8 +394,9 @@ def _from_normal_time(self, value):

def encode_to_stream(self, value, out, nested):
span_micros = value.end.micros - value.start.micros
out.write_bigendian_uint64(self._from_normal_time(value.end.micros / 1000))
out.write_var_int64(span_micros / 1000)
out.write_bigendian_uint64(self._from_normal_time(
old_div(value.end.micros, 1000)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer // to old_div if the types are integral (as they are here), and plain old / for floats.

out.write_var_int64(old_div(span_micros, 1000))

def decode_from_stream(self, in_, nested):
end_millis = self._to_normal_time(in_.read_bigendian_uint64())
Expand All @@ -392,7 +410,7 @@ def estimate_size(self, value, nested=False):
# An IntervalWindow is context-insensitive, with a timestamp (8 bytes)
# and a varint timespam.
span = value.end.micros - value.start.micros
return 8 + get_varint_size(span / 1000)
return 8 + get_varint_size(old_div(span, 1000))


class TimestampCoderImpl(StreamCoderImpl):
Expand Down Expand Up @@ -689,7 +707,7 @@ def encode_to_stream(self, value, out, nested):
# TODO(BEAM-1524): Clean this up once we have a BEAM wide consensus on
# precision of timestamps.
self._from_normal_time(
restore_sign * (abs(wv.timestamp_micros) / 1000)))
restore_sign * (old_div(abs(wv.timestamp_micros), 1000))))
self._windows_coder.encode_to_stream(wv.windows, out, True)
# Default PaneInfo encoded byte representing NO_FIRING.
# TODO(BEAM-1522): Remove the hard coding here once PaneInfo is supported.
Expand All @@ -704,9 +722,9 @@ def decode_from_stream(self, in_stream, nested):
# were indeed MIN/MAX timestamps.
# TODO(BEAM-1524): Clean this up once we have a BEAM wide consensus on
# precision of timestamps.
if timestamp == -(abs(MIN_TIMESTAMP.micros) / 1000):
if timestamp == -(old_div(abs(MIN_TIMESTAMP.micros), 1000)):
timestamp = MIN_TIMESTAMP.micros
elif timestamp == (MAX_TIMESTAMP.micros / 1000):
elif timestamp == (old_div(MAX_TIMESTAMP.micros, 1000)):
timestamp = MAX_TIMESTAMP.micros
else:
timestamp *= 1000
Expand Down
40 changes: 33 additions & 7 deletions sdks/python/apache_beam/coders/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,32 @@

Only those coders listed in __all__ are part of the public API of this module.
"""
from __future__ import absolute_import

import base64
import cPickle as pickle
import sys
from builtins import object

import google.protobuf
from future import standard_library

from apache_beam.coders import coder_impl
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.utils import urns
from apache_beam.utils import proto_utils
from apache_beam.utils import proto_utils, urns

standard_library.install_aliases()

if sys.version_info[0] >= 3:
import pickle as pickle
else:
import cPickle as pickle


# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
from stream import get_varint_size
from .stream import get_varint_size
except ImportError:
from slow_stream import get_varint_size
from .slow_stream import get_varint_size
# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports


Expand All @@ -52,7 +63,7 @@
__all__ = ['Coder',
'BytesCoder', 'DillCoder', 'FastPrimitivesCoder', 'FloatCoder',
'IterableCoder', 'PickleCoder', 'ProtoCoder', 'SingletonCoder',
'StrUtf8Coder', 'TimestampCoder', 'TupleCoder',
'StrUtf8Coder', 'StrUtf8StrCoder', 'TimestampCoder', 'TupleCoder',
'TupleSequenceCoder', 'VarIntCoder', 'WindowedValueCoder']


Expand Down Expand Up @@ -291,11 +302,26 @@ def is_deterministic(self):
return True


class StrUtf8StrCoder(Coder):
"""A coder used for reading and writing strings as UTF-8.
Used for Python 2 to force into string rather than unicode on decode."""

def encode(self, value):
return value.encode('utf-8')

def decode(self, value):
return str(value.decode('utf-8'))

def is_deterministic(self):
return True


class ToStringCoder(Coder):
"""A default string coder used if no sink coder is specified."""

def encode(self, value):
if isinstance(value, unicode):
# TODO(holden before merge): Do we just want to send everything to UTF-8?
if isinstance(value, str):
return value.encode('utf-8')
elif isinstance(value, str):
return value
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/coders/coders_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
import base64
import logging
import unittest
from builtins import object

from apache_beam.coders import coders
from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
from apache_beam.coders import coders
from apache_beam.coders.typecoders import registry as coders_registry


Expand Down
Loading