Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdks/python/apache_beam/io/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,12 @@ class FileSystem(BeamPlugin):
__metaclass__ = abc.ABCMeta
CHUNK_SIZE = 1 # Chuck size in the batch operations

def __init__(self, pipeline_options):
"""
Args:
pipeline_options: Instance of ``PipelineOptions``.
"""

@staticmethod
def _get_compression_type(path, compression_type):
if compression_type == CompressionTypes.AUTO:
Expand Down
13 changes: 12 additions & 1 deletion sdks/python/apache_beam/io/filesystems.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ class FileSystems(object):
"""
URI_SCHEMA_PATTERN = re.compile('(?P<scheme>[a-zA-Z][-a-zA-Z0-9+.]*)://.*')

_pipeline_options = None

@classmethod
def set_options(cls, pipeline_options):
"""Set filesystem options.

Args:
pipeline_options: Instance of ``PipelineOptions``.
"""
cls._options = pipeline_options

@staticmethod
def get_scheme(path):
match_result = FileSystems.URI_SCHEMA_PATTERN.match(path.strip())
Expand All @@ -60,7 +71,7 @@ def get_filesystem(path):
if len(systems) == 0:
raise ValueError('Unable to get the Filesystem for path %s' % path)
elif len(systems) == 1:
return systems[0]()
return systems[0](pipeline_options=FileSystems._pipeline_options)
else:
raise ValueError('Found more than one filesystem for path %s' % path)
except ValueError:
Expand Down
71 changes: 30 additions & 41 deletions sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from apache_beam.io.filesystem import BeamIOError
from apache_beam.io.filesystem import FileMetadata
from apache_beam.options.pipeline_options import PipelineOptions

# Protect against environments where apitools library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
Expand All @@ -37,39 +38,40 @@
@unittest.skipIf(gcsfilesystem is None, 'GCP dependencies are not installed')
class GCSFileSystemTest(unittest.TestCase):

def setUp(self):
pipeline_options = PipelineOptions()
self.fs = gcsfilesystem.GCSFileSystem(pipeline_options=pipeline_options)

def test_scheme(self):
file_system = gcsfilesystem.GCSFileSystem()
self.assertEqual(file_system.scheme(), 'gs')
self.assertEqual(self.fs.scheme(), 'gs')
self.assertEqual(gcsfilesystem.GCSFileSystem.scheme(), 'gs')

def test_join(self):
file_system = gcsfilesystem.GCSFileSystem()
self.assertEqual('gs://bucket/path/to/file',
file_system.join('gs://bucket/path', 'to', 'file'))
self.fs.join('gs://bucket/path', 'to', 'file'))
self.assertEqual('gs://bucket/path/to/file',
file_system.join('gs://bucket/path', 'to/file'))
self.fs.join('gs://bucket/path', 'to/file'))
self.assertEqual('gs://bucket/path/to/file',
file_system.join('gs://bucket/path', '/to/file'))
self.fs.join('gs://bucket/path', '/to/file'))
self.assertEqual('gs://bucket/path/to/file',
file_system.join('gs://bucket/path/', 'to', 'file'))
self.fs.join('gs://bucket/path/', 'to', 'file'))
self.assertEqual('gs://bucket/path/to/file',
file_system.join('gs://bucket/path/', 'to/file'))
self.fs.join('gs://bucket/path/', 'to/file'))
self.assertEqual('gs://bucket/path/to/file',
file_system.join('gs://bucket/path/', '/to/file'))
self.fs.join('gs://bucket/path/', '/to/file'))
with self.assertRaises(ValueError):
file_system.join('/bucket/path/', '/to/file')
self.fs.join('/bucket/path/', '/to/file')

def test_split(self):
file_system = gcsfilesystem.GCSFileSystem()
self.assertEqual(('gs://foo/bar', 'baz'),
file_system.split('gs://foo/bar/baz'))
self.fs.split('gs://foo/bar/baz'))
self.assertEqual(('gs://foo', ''),
file_system.split('gs://foo/'))
self.fs.split('gs://foo/'))
self.assertEqual(('gs://foo', ''),
file_system.split('gs://foo'))
self.fs.split('gs://foo'))

with self.assertRaises(ValueError):
file_system.split('/no/gcs/prefix')
self.fs.split('/no/gcs/prefix')

@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
def test_match_multiples(self, mock_gcsio):
Expand All @@ -84,8 +86,7 @@ def test_match_multiples(self, mock_gcsio):
FileMetadata('gs://bucket/file1', 1),
FileMetadata('gs://bucket/file2', 2)
])
file_system = gcsfilesystem.GCSFileSystem()
match_result = file_system.match(['gs://bucket/'])[0]
match_result = self.fs.match(['gs://bucket/'])[0]
self.assertEqual(
set(match_result.metadata_list),
expected_results)
Expand All @@ -104,8 +105,7 @@ def test_match_multiples_limit(self, mock_gcsio):
expected_results = set([
FileMetadata('gs://bucket/file1', 1)
])
file_system = gcsfilesystem.GCSFileSystem()
match_result = file_system.match(['gs://bucket/'], [limit])[0]
match_result = self.fs.match(['gs://bucket/'], [limit])[0]
self.assertEqual(
set(match_result.metadata_list),
expected_results)
Expand All @@ -124,10 +124,9 @@ def test_match_multiples_error(self, mock_gcsio):
gcsio_mock.size_of_files_in_glob.side_effect = exception
expected_results = {'gs://bucket/': exception}

file_system = gcsfilesystem.GCSFileSystem()
with self.assertRaisesRegexp(BeamIOError,
r'^Match operation failed') as error:
file_system.match(['gs://bucket/'])
self.fs.match(['gs://bucket/'])
self.assertEqual(error.exception.exception_details, expected_results)
gcsio_mock.size_of_files_in_glob.assert_called_once_with(
'gs://bucket/*', None)
Expand All @@ -145,8 +144,7 @@ def test_match_multiple_patterns(self, mock_gcsio):
[FileMetadata('gs://bucket/file1', 1)],
[FileMetadata('gs://bucket/file2', 2)]
]
file_system = gcsfilesystem.GCSFileSystem()
result = file_system.match(['gs://bucket/file1*', 'gs://bucket/file2*'])
result = self.fs.match(['gs://bucket/file1*', 'gs://bucket/file2*'])
self.assertEqual(
[mr.metadata_list for mr in result],
expected_results)
Expand All @@ -157,8 +155,7 @@ def test_create(self, mock_gcsio):
gcsio_mock = mock.MagicMock()
gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
# Issue file copy
file_system = gcsfilesystem.GCSFileSystem()
_ = file_system.create('gs://bucket/from1', 'application/octet-stream')
_ = self.fs.create('gs://bucket/from1', 'application/octet-stream')

gcsio_mock.open.assert_called_once_with(
'gs://bucket/from1', 'wb', mime_type='application/octet-stream')
Expand All @@ -169,8 +166,7 @@ def test_open(self, mock_gcsio):
gcsio_mock = mock.MagicMock()
gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
# Issue file copy
file_system = gcsfilesystem.GCSFileSystem()
_ = file_system.open('gs://bucket/from1', 'application/octet-stream')
_ = self.fs.open('gs://bucket/from1', 'application/octet-stream')

gcsio_mock.open.assert_called_once_with(
'gs://bucket/from1', 'rb', mime_type='application/octet-stream')
Expand All @@ -184,8 +180,7 @@ def test_copy_file(self, mock_gcsio):
destinations = ['gs://bucket/to1']

# Issue file copy
file_system = gcsfilesystem.GCSFileSystem()
file_system.copy(sources, destinations)
self.fs.copy(sources, destinations)

gcsio_mock.copy.assert_called_once_with(
'gs://bucket/from1', 'gs://bucket/to1')
Expand All @@ -205,10 +200,9 @@ def test_copy_file_error(self, mock_gcsio):
expected_results = {(s, d):exception for s, d in zip(sources, destinations)}

# Issue batch copy.
file_system = gcsfilesystem.GCSFileSystem()
with self.assertRaisesRegexp(BeamIOError,
r'^Copy operation failed') as error:
file_system.copy(sources, destinations)
self.fs.copy(sources, destinations)
self.assertEqual(error.exception.exception_details, expected_results)

gcsio_mock.copy.assert_called_once_with(
Expand All @@ -223,8 +217,7 @@ def test_copy_tree(self, mock_gcsio):
destinations = ['gs://bucket2/']

# Issue directory copy
file_system = gcsfilesystem.GCSFileSystem()
file_system.copy(sources, destinations)
self.fs.copy(sources, destinations)

gcsio_mock.copytree.assert_called_once_with(
'gs://bucket1/', 'gs://bucket2/')
Expand Down Expand Up @@ -256,8 +249,7 @@ def test_rename(self, mock_gcsio):
]]

# Issue batch rename.
file_system = gcsfilesystem.GCSFileSystem()
file_system.rename(sources, destinations)
self.fs.rename(sources, destinations)

gcsio_mock.copy_batch.assert_called_once_with([
('gs://bucket/from1', 'gs://bucket/to1'),
Expand Down Expand Up @@ -297,10 +289,9 @@ def test_rename_error(self, mock_gcsio):
expected_results = {(s, d):exception for s, d in zip(sources, destinations)}

# Issue batch rename.
file_system = gcsfilesystem.GCSFileSystem()
with self.assertRaisesRegexp(BeamIOError,
r'^Rename operation failed') as error:
file_system.rename(sources, destinations)
self.fs.rename(sources, destinations)
self.assertEqual(error.exception.exception_details, expected_results)

gcsio_mock.copy_batch.assert_called_once_with([
Expand All @@ -326,8 +317,7 @@ def test_delete(self, mock_gcsio):
]

# Issue batch delete.
file_system = gcsfilesystem.GCSFileSystem()
file_system.delete(files)
self.fs.delete(files)
gcsio_mock.delete_batch.assert_called()

@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
Expand All @@ -345,9 +335,8 @@ def test_delete_error(self, mock_gcsio):
expected_results = {f:exception for f in files}

# Issue batch delete.
file_system = gcsfilesystem.GCSFileSystem()
with self.assertRaisesRegexp(BeamIOError,
r'^Delete operation failed') as error:
file_system.delete(files)
self.fs.delete(files)
self.assertEqual(error.exception.exception_details, expected_results)
gcsio_mock.delete_batch.assert_called()
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/io/hadoopfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ class HadoopFileSystem(FileSystem):
Uses client library :class:`hdfs3.core.HDFileSystem`.
"""

def __init__(self):
def __init__(self, pipeline_options):
"""Initializes a connection to HDFS.

Connection configuration is done using :doc:`hdfs`.
"""
super(HadoopFileSystem, self).__init__()
super(HadoopFileSystem, self).__init__(pipeline_options)
self._hdfs_client = HDFileSystem()

@classmethod
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/io/hadoopfilesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from apache_beam.io import hadoopfilesystem
from apache_beam.io.filesystem import BeamIOError
from apache_beam.options.pipeline_options import PipelineOptions


class FakeFile(StringIO.StringIO):
Expand Down Expand Up @@ -166,7 +167,8 @@ class HadoopFileSystemTest(unittest.TestCase):
def setUp(self):
self._fake_hdfs = FakeHdfs()
hadoopfilesystem.HDFileSystem = lambda *args, **kwargs: self._fake_hdfs
self.fs = hadoopfilesystem.HadoopFileSystem()
pipeline_options = PipelineOptions()
self.fs = hadoopfilesystem.HadoopFileSystem(pipeline_options)
self.tmpdir = 'hdfs://test_dir'

for filename in ['old_file1', 'old_file2']:
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/io/localfilesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

from apache_beam.io import localfilesystem
from apache_beam.io.filesystem import BeamIOError
from apache_beam.options.pipeline_options import PipelineOptions


def _gen_fake_join(separator):
Expand Down Expand Up @@ -56,7 +57,8 @@ class LocalFileSystemTest(unittest.TestCase):

def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.fs = localfilesystem.LocalFileSystem()
pipeline_options = PipelineOptions()
self.fs = localfilesystem.LocalFileSystem(pipeline_options)

def tearDown(self):
shutil.rmtree(self.tmpdir)
Expand Down
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

from apache_beam import pvalue
from apache_beam.internal import pickler
from apache_beam.io.filesystems import FileSystems
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
Expand Down Expand Up @@ -124,6 +125,8 @@ def __init__(self, runner=None, options=None, argv=None):
else:
self._options = PipelineOptions([])

FileSystems.set_options(self._options)

if runner is None:
runner = self._options.view_as(StandardOptions).runner
if runner is None:
Expand Down