Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/coders/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ def from_runner_api(cls, coder_proto, context):
context)
except Exception:
if context.allow_proto_holders:
return RunnerAPICoderHolder(
coder_proto) # type: ignore # too ambiguous
# ignore this typing scenario for now, since it can't be easily tracked
return RunnerAPICoderHolder(coder_proto) # type: ignore
raise

def to_runner_api_parameter(self, context):
Expand Down
24 changes: 12 additions & 12 deletions sdks/python/apache_beam/io/aws/s3filesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
try:
from apache_beam.io.aws import s3filesystem
except ImportError:
s3filesystem = None
s3filesystem = None # type: ignore[assignment]
# pylint: enable=wrong-import-order, wrong-import-position


Expand Down Expand Up @@ -84,7 +84,7 @@ def test_split(self):
def test_match_multiples(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda: s3io_mock
s3filesystem.s3io.S3IO = lambda: s3io_mock # type: ignore[misc]
s3io_mock.list_prefix.return_value = {
's3://bucket/file1': 1, 's3://bucket/file2': 2
}
Expand All @@ -102,7 +102,7 @@ def test_match_multiples_limit(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
limit = 1
s3filesystem.s3io.S3IO = lambda: s3io_mock
s3filesystem.s3io.S3IO = lambda: s3io_mock # type: ignore[misc]
s3io_mock.list_prefix.return_value = {'s3://bucket/file1': 1}
expected_results = set([FileMetadata('s3://bucket/file1', 1)])
match_result = self.fs.match(['s3://bucket/'], [limit])[0]
Expand All @@ -114,7 +114,7 @@ def test_match_multiples_limit(self, unused_mock_arg):
def test_match_multiples_error(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda: s3io_mock
s3filesystem.s3io.S3IO = lambda: s3io_mock # type: ignore[misc]
exception = IOError('Failed')
s3io_mock.list_prefix.side_effect = exception

Expand All @@ -128,7 +128,7 @@ def test_match_multiples_error(self, unused_mock_arg):
def test_match_multiple_patterns(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda: s3io_mock
s3filesystem.s3io.S3IO = lambda: s3io_mock # type: ignore[misc]
s3io_mock.list_prefix.side_effect = [
{
's3://bucket/file1': 1
Expand All @@ -146,7 +146,7 @@ def test_match_multiple_patterns(self, unused_mock_arg):
def test_create(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda: s3io_mock
s3filesystem.s3io.S3IO = lambda: s3io_mock # type: ignore[misc]
# Issue file copy
_ = self.fs.create('s3://bucket/from1', 'application/octet-stream')

Expand All @@ -157,7 +157,7 @@ def test_create(self, unused_mock_arg):
def test_open(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda: s3io_mock
s3filesystem.s3io.S3IO = lambda: s3io_mock # type: ignore[misc]
# Issue file copy
_ = self.fs.open('s3://bucket/from1', 'application/octet-stream')

Expand All @@ -168,7 +168,7 @@ def test_open(self, unused_mock_arg):
def test_copy_file(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda: s3io_mock
s3filesystem.s3io.S3IO = lambda: s3io_mock # type: ignore[misc]

sources = ['s3://bucket/from1', 's3://bucket/from2']
destinations = ['s3://bucket/to1', 's3://bucket/to2']
Expand All @@ -183,7 +183,7 @@ def test_copy_file(self, unused_mock_arg):
def test_copy_file_error(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda: s3io_mock
s3filesystem.s3io.S3IO = lambda: s3io_mock # type: ignore[misc]

sources = ['s3://bucket/from1', 's3://bucket/from2', 's3://bucket/from3']
destinations = ['s3://bucket/to1', 's3://bucket/to2']
Expand All @@ -196,7 +196,7 @@ def test_copy_file_error(self, unused_mock_arg):
def test_delete(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda: s3io_mock
s3filesystem.s3io.S3IO = lambda: s3io_mock # type: ignore[misc]
s3io_mock.size.return_value = 0
files = [
's3://bucket/from1',
Expand All @@ -212,7 +212,7 @@ def test_delete(self, unused_mock_arg):
def test_delete_error(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda: s3io_mock
s3filesystem.s3io.S3IO = lambda: s3io_mock # type: ignore[misc]

problematic_directory = 's3://nonexistent-bucket/tree/'
exception = messages.S3ClientError('Not found', 404)
Expand Down Expand Up @@ -242,7 +242,7 @@ def test_delete_error(self, unused_mock_arg):
def test_rename(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda: s3io_mock
s3filesystem.s3io.S3IO = lambda: s3io_mock # type: ignore[misc]

sources = ['s3://bucket/from1', 's3://bucket/from2']
destinations = ['s3://bucket/to1', 's3://bucket/to2']
Expand Down
8 changes: 4 additions & 4 deletions sdks/python/apache_beam/io/aws/s3io.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,16 +218,16 @@ def copy_paths(self, src_dest_pairs):
try:
self.client.copy(request)
results.append((src_path, dest_path, None))
except messages.S3ClientError as e:
results.append((src_path, dest_path, e))
except messages.S3ClientError as err:
results.append((src_path, dest_path, err))

# Mismatched paths (one directory, one non-directory) get an error result
else:
err = messages.S3ClientError(
e = messages.S3ClientError(
"Can't copy mismatched paths (one directory, one non-directory):" +
' %s, %s' % (src_path, dest_path),
400)
results.append((src_path, dest_path, err))
results.append((src_path, dest_path, e))

return results

Expand Down
35 changes: 33 additions & 2 deletions sdks/python/apache_beam/io/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
from builtins import object
from builtins import zip
from typing import BinaryIO # pylint: disable=unused-import
from typing import Iterator
from typing import List
from typing import Optional
from typing import Tuple

from future.utils import with_metaclass
Expand Down Expand Up @@ -128,7 +131,7 @@ class CompressedFile(object):

def __init__(
self,
fileobj,
fileobj, # type: BinaryIO
compression_type=CompressionTypes.GZIP,
read_size=DEFAULT_READ_BUFFER_SIZE):
if not fileobj:
Expand All @@ -150,7 +153,7 @@ def __init__(
raise ValueError(
'File object must be at position 0 but was %d' % self._file.tell())
self._uncompressed_position = 0
self._uncompressed_size = None
self._uncompressed_size = None # type: Optional[int]

if self.readable():
self._read_size = read_size
Expand Down Expand Up @@ -188,14 +191,18 @@ def _initialize_compressor(self):
zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, self._gzip_mask)

def readable(self):
# type: () -> bool
mode = self._file.mode
return 'r' in mode or 'a' in mode

def writeable(self):
# type: () -> bool
mode = self._file.mode
return 'w' in mode or 'a' in mode

def write(self, data):
# type: (bytes) -> None

"""Write data to file."""
if not self._compressor:
raise ValueError('compressor not initialized')
Expand All @@ -205,6 +212,8 @@ def write(self, data):
self._file.write(compressed)

def _fetch_to_internal_buffer(self, num_bytes):
# type: (int) -> None

"""Fetch up to num_bytes into the internal buffer."""
if (not self._read_eof and self._read_position > 0 and
(self._read_buffer.tell() - self._read_position) < num_bytes):
Expand Down Expand Up @@ -259,6 +268,7 @@ def _read_from_internal_buffer(self, read_fn):
return result

def read(self, num_bytes):
# type: (int) -> bytes
if not self._decompressor:
raise ValueError('decompressor not initialized')

Expand All @@ -267,6 +277,8 @@ def read(self, num_bytes):
lambda: self._read_buffer.read(num_bytes))

def readline(self):
# type: () -> bytes

"""Equivalent to standard file.readline(). Same return conventions apply."""
if not self._decompressor:
raise ValueError('decompressor not initialized')
Expand All @@ -287,9 +299,11 @@ def readline(self):
return bytes_io.getvalue()

def closed(self):
# type: () -> bool
return not self._file or self._file.closed()

def close(self):
# type: () -> None
if self.readable():
self._read_buffer.close()

Expand All @@ -299,29 +313,37 @@ def close(self):
self._file.close()

def flush(self):
# type: () -> None
if self.writeable():
self._file.write(self._compressor.flush())
self._file.flush()

@property
def seekable(self):
# type: () -> bool
return 'r' in self._file.mode

def _clear_read_buffer(self):
# type: () -> None

"""Clears the read buffer by removing all the contents and
resetting _read_position to 0"""
self._read_position = 0
self._read_buffer.seek(0)
self._read_buffer.truncate(0)

def _rewind_file(self):
# type: () -> None

"""Seeks to the beginning of the input file. Input file's EOF marker
is cleared and _uncompressed_position is reset to zero"""
self._file.seek(0, os.SEEK_SET)
self._read_eof = False
self._uncompressed_position = 0

def _rewind(self):
# type: () -> None

"""Seeks to the beginning of the input file and resets the internal read
buffer. The decompressor object is re-initialized to ensure that no data
left in it's buffer."""
Expand All @@ -332,6 +354,8 @@ def _rewind(self):
self._initialize_decompressor()

def seek(self, offset, whence=os.SEEK_SET):
# type: (int, int) -> None

"""Set the file's current offset.

Seeking behavior:
Expand Down Expand Up @@ -396,6 +420,8 @@ def seek(self, offset, whence=os.SEEK_SET):
bytes_to_skip -= len(data)

def tell(self):
# type: () -> int

"""Returns current position in uncompressed file."""
return self._uncompressed_position

Expand Down Expand Up @@ -439,6 +465,7 @@ class MatchResult(object):
of matched ``FileMetadata``.
"""
def __init__(self, pattern, metadata_list):
# type: (str, List[FileMetadata]) -> None
self.metadata_list = metadata_list
self.pattern = pattern

Expand Down Expand Up @@ -583,6 +610,8 @@ def _url_dirname(self, url_or_path):
return self._combine_scheme(scheme, posixpath.dirname(path))

def match_files(self, file_metas, pattern):
# type: (List[FileMetadata], str) -> Iterator[FileMetadata]

"""Filter :class:`FileMetadata` objects by *pattern*

Args:
Expand All @@ -604,6 +633,8 @@ def match_files(self, file_metas, pattern):

@staticmethod
def translate_pattern(pattern):
# type: (str) -> str

"""
Translate a *pattern* to a regular expression.
There is no way to quote meta-characters.
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/io/iobase.py
Original file line number Diff line number Diff line change
Expand Up @@ -1336,6 +1336,7 @@ def fraction_remaining(self):
return float(self._remaining) / self.total_work

def with_completed(self, completed):
# type: (int) -> RestrictionProgress
return RestrictionProgress(
fraction=self._fraction, remaining=self._remaining, completed=completed)

Expand Down
Loading