diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index 0efdb0ef7519..424462ab30c9 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -431,6 +431,12 @@ class FileSystem(BeamPlugin): __metaclass__ = abc.ABCMeta CHUNK_SIZE = 1 # Chuck size in the batch operations + def __init__(self, pipeline_options): + """ + Args: + pipeline_options: Instance of ``PipelineOptions``. + """ + @staticmethod def _get_compression_type(path, compression_type): if compression_type == CompressionTypes.AUTO: diff --git a/sdks/python/apache_beam/io/filesystems.py b/sdks/python/apache_beam/io/filesystems.py index 0c82a7e25dc8..dad4e5f9f27d 100644 --- a/sdks/python/apache_beam/io/filesystems.py +++ b/sdks/python/apache_beam/io/filesystems.py @@ -42,6 +42,17 @@ class FileSystems(object): """ URI_SCHEMA_PATTERN = re.compile('(?P[a-zA-Z][-a-zA-Z0-9+.]*)://.*') + _pipeline_options = None + + @classmethod + def set_options(cls, pipeline_options): + """Set filesystem options. + + Args: + pipeline_options: Instance of ``PipelineOptions``. + """ + cls._options = pipeline_options + @staticmethod def get_scheme(path): match_result = FileSystems.URI_SCHEMA_PATTERN.match(path.strip()) @@ -60,7 +71,7 @@ def get_filesystem(path): if len(systems) == 0: raise ValueError('Unable to get the Filesystem for path %s' % path) elif len(systems) == 1: - return systems[0]() + return systems[0](pipeline_options=FileSystems._pipeline_options) else: raise ValueError('Found more than one filesystem for path %s' % path) except ValueError: diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index c174e48778e5..bc55b08f7dda 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -24,6 +24,7 @@ from apache_beam.io.filesystem import BeamIOError from apache_beam.io.filesystem import FileMetadata +from apache_beam.options.pipeline_options import PipelineOptions # Protect against environments where apitools library is not available. # pylint: disable=wrong-import-order, wrong-import-position @@ -37,39 +38,40 @@ @unittest.skipIf(gcsfilesystem is None, 'GCP dependencies are not installed') class GCSFileSystemTest(unittest.TestCase): + def setUp(self): + pipeline_options = PipelineOptions() + self.fs = gcsfilesystem.GCSFileSystem(pipeline_options=pipeline_options) + def test_scheme(self): - file_system = gcsfilesystem.GCSFileSystem() - self.assertEqual(file_system.scheme(), 'gs') + self.assertEqual(self.fs.scheme(), 'gs') self.assertEqual(gcsfilesystem.GCSFileSystem.scheme(), 'gs') def test_join(self): - file_system = gcsfilesystem.GCSFileSystem() self.assertEqual('gs://bucket/path/to/file', - file_system.join('gs://bucket/path', 'to', 'file')) + self.fs.join('gs://bucket/path', 'to', 'file')) self.assertEqual('gs://bucket/path/to/file', - file_system.join('gs://bucket/path', 'to/file')) + self.fs.join('gs://bucket/path', 'to/file')) self.assertEqual('gs://bucket/path/to/file', - file_system.join('gs://bucket/path', '/to/file')) + self.fs.join('gs://bucket/path', '/to/file')) self.assertEqual('gs://bucket/path/to/file', - file_system.join('gs://bucket/path/', 'to', 'file')) + self.fs.join('gs://bucket/path/', 'to', 'file')) self.assertEqual('gs://bucket/path/to/file', - file_system.join('gs://bucket/path/', 'to/file')) + self.fs.join('gs://bucket/path/', 'to/file')) self.assertEqual('gs://bucket/path/to/file', - file_system.join('gs://bucket/path/', '/to/file')) + self.fs.join('gs://bucket/path/', '/to/file')) with self.assertRaises(ValueError): - file_system.join('/bucket/path/', '/to/file') + self.fs.join('/bucket/path/', '/to/file') def test_split(self): - file_system = gcsfilesystem.GCSFileSystem() self.assertEqual(('gs://foo/bar', 'baz'), - file_system.split('gs://foo/bar/baz')) + self.fs.split('gs://foo/bar/baz')) self.assertEqual(('gs://foo', ''), - file_system.split('gs://foo/')) + self.fs.split('gs://foo/')) self.assertEqual(('gs://foo', ''), - file_system.split('gs://foo')) + self.fs.split('gs://foo')) with self.assertRaises(ValueError): - file_system.split('/no/gcs/prefix') + self.fs.split('/no/gcs/prefix') @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') def test_match_multiples(self, mock_gcsio): @@ -84,8 +86,7 @@ def test_match_multiples(self, mock_gcsio): FileMetadata('gs://bucket/file1', 1), FileMetadata('gs://bucket/file2', 2) ]) - file_system = gcsfilesystem.GCSFileSystem() - match_result = file_system.match(['gs://bucket/'])[0] + match_result = self.fs.match(['gs://bucket/'])[0] self.assertEqual( set(match_result.metadata_list), expected_results) @@ -104,8 +105,7 @@ def test_match_multiples_limit(self, mock_gcsio): expected_results = set([ FileMetadata('gs://bucket/file1', 1) ]) - file_system = gcsfilesystem.GCSFileSystem() - match_result = file_system.match(['gs://bucket/'], [limit])[0] + match_result = self.fs.match(['gs://bucket/'], [limit])[0] self.assertEqual( set(match_result.metadata_list), expected_results) @@ -124,10 +124,9 @@ def test_match_multiples_error(self, mock_gcsio): gcsio_mock.size_of_files_in_glob.side_effect = exception expected_results = {'gs://bucket/': exception} - file_system = gcsfilesystem.GCSFileSystem() with self.assertRaisesRegexp(BeamIOError, r'^Match operation failed') as error: - file_system.match(['gs://bucket/']) + self.fs.match(['gs://bucket/']) self.assertEqual(error.exception.exception_details, expected_results) gcsio_mock.size_of_files_in_glob.assert_called_once_with( 'gs://bucket/*', None) @@ -145,8 +144,7 @@ def test_match_multiple_patterns(self, mock_gcsio): [FileMetadata('gs://bucket/file1', 1)], [FileMetadata('gs://bucket/file2', 2)] ] - file_system = gcsfilesystem.GCSFileSystem() - result = file_system.match(['gs://bucket/file1*', 'gs://bucket/file2*']) + result = self.fs.match(['gs://bucket/file1*', 'gs://bucket/file2*']) self.assertEqual( [mr.metadata_list for mr in result], expected_results) @@ -157,8 +155,7 @@ def test_create(self, mock_gcsio): gcsio_mock = mock.MagicMock() gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock # Issue file copy - file_system = gcsfilesystem.GCSFileSystem() - _ = file_system.create('gs://bucket/from1', 'application/octet-stream') + _ = self.fs.create('gs://bucket/from1', 'application/octet-stream') gcsio_mock.open.assert_called_once_with( 'gs://bucket/from1', 'wb', mime_type='application/octet-stream') @@ -169,8 +166,7 @@ def test_open(self, mock_gcsio): gcsio_mock = mock.MagicMock() gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock # Issue file copy - file_system = gcsfilesystem.GCSFileSystem() - _ = file_system.open('gs://bucket/from1', 'application/octet-stream') + _ = self.fs.open('gs://bucket/from1', 'application/octet-stream') gcsio_mock.open.assert_called_once_with( 'gs://bucket/from1', 'rb', mime_type='application/octet-stream') @@ -184,8 +180,7 @@ def test_copy_file(self, mock_gcsio): destinations = ['gs://bucket/to1'] # Issue file copy - file_system = gcsfilesystem.GCSFileSystem() - file_system.copy(sources, destinations) + self.fs.copy(sources, destinations) gcsio_mock.copy.assert_called_once_with( 'gs://bucket/from1', 'gs://bucket/to1') @@ -205,10 +200,9 @@ def test_copy_file_error(self, mock_gcsio): expected_results = {(s, d):exception for s, d in zip(sources, destinations)} # Issue batch copy. - file_system = gcsfilesystem.GCSFileSystem() with self.assertRaisesRegexp(BeamIOError, r'^Copy operation failed') as error: - file_system.copy(sources, destinations) + self.fs.copy(sources, destinations) self.assertEqual(error.exception.exception_details, expected_results) gcsio_mock.copy.assert_called_once_with( @@ -223,8 +217,7 @@ def test_copy_tree(self, mock_gcsio): destinations = ['gs://bucket2/'] # Issue directory copy - file_system = gcsfilesystem.GCSFileSystem() - file_system.copy(sources, destinations) + self.fs.copy(sources, destinations) gcsio_mock.copytree.assert_called_once_with( 'gs://bucket1/', 'gs://bucket2/') @@ -256,8 +249,7 @@ def test_rename(self, mock_gcsio): ]] # Issue batch rename. - file_system = gcsfilesystem.GCSFileSystem() - file_system.rename(sources, destinations) + self.fs.rename(sources, destinations) gcsio_mock.copy_batch.assert_called_once_with([ ('gs://bucket/from1', 'gs://bucket/to1'), @@ -297,10 +289,9 @@ def test_rename_error(self, mock_gcsio): expected_results = {(s, d):exception for s, d in zip(sources, destinations)} # Issue batch rename. - file_system = gcsfilesystem.GCSFileSystem() with self.assertRaisesRegexp(BeamIOError, r'^Rename operation failed') as error: - file_system.rename(sources, destinations) + self.fs.rename(sources, destinations) self.assertEqual(error.exception.exception_details, expected_results) gcsio_mock.copy_batch.assert_called_once_with([ @@ -326,8 +317,7 @@ def test_delete(self, mock_gcsio): ] # Issue batch delete. - file_system = gcsfilesystem.GCSFileSystem() - file_system.delete(files) + self.fs.delete(files) gcsio_mock.delete_batch.assert_called() @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') @@ -345,9 +335,8 @@ def test_delete_error(self, mock_gcsio): expected_results = {f:exception for f in files} # Issue batch delete. - file_system = gcsfilesystem.GCSFileSystem() with self.assertRaisesRegexp(BeamIOError, r'^Delete operation failed') as error: - file_system.delete(files) + self.fs.delete(files) self.assertEqual(error.exception.exception_details, expected_results) gcsio_mock.delete_batch.assert_called() diff --git a/sdks/python/apache_beam/io/hadoopfilesystem.py b/sdks/python/apache_beam/io/hadoopfilesystem.py index 9aad7f09dd15..a761068f413b 100644 --- a/sdks/python/apache_beam/io/hadoopfilesystem.py +++ b/sdks/python/apache_beam/io/hadoopfilesystem.py @@ -52,12 +52,12 @@ class HadoopFileSystem(FileSystem): Uses client library :class:`hdfs3.core.HDFileSystem`. """ - def __init__(self): + def __init__(self, pipeline_options): """Initializes a connection to HDFS. Connection configuration is done using :doc:`hdfs`. """ - super(HadoopFileSystem, self).__init__() + super(HadoopFileSystem, self).__init__(pipeline_options) self._hdfs_client = HDFileSystem() @classmethod diff --git a/sdks/python/apache_beam/io/hadoopfilesystem_test.py b/sdks/python/apache_beam/io/hadoopfilesystem_test.py index af5bca86344d..8a1c0f1b3d29 100644 --- a/sdks/python/apache_beam/io/hadoopfilesystem_test.py +++ b/sdks/python/apache_beam/io/hadoopfilesystem_test.py @@ -25,6 +25,7 @@ from apache_beam.io import hadoopfilesystem from apache_beam.io.filesystem import BeamIOError +from apache_beam.options.pipeline_options import PipelineOptions class FakeFile(StringIO.StringIO): @@ -166,7 +167,8 @@ class HadoopFileSystemTest(unittest.TestCase): def setUp(self): self._fake_hdfs = FakeHdfs() hadoopfilesystem.HDFileSystem = lambda *args, **kwargs: self._fake_hdfs - self.fs = hadoopfilesystem.HadoopFileSystem() + pipeline_options = PipelineOptions() + self.fs = hadoopfilesystem.HadoopFileSystem(pipeline_options) self.tmpdir = 'hdfs://test_dir' for filename in ['old_file1', 'old_file2']: diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py b/sdks/python/apache_beam/io/localfilesystem_test.py index 9bc1a0774c5e..29b68f61c34d 100644 --- a/sdks/python/apache_beam/io/localfilesystem_test.py +++ b/sdks/python/apache_beam/io/localfilesystem_test.py @@ -28,6 +28,7 @@ from apache_beam.io import localfilesystem from apache_beam.io.filesystem import BeamIOError +from apache_beam.options.pipeline_options import PipelineOptions def _gen_fake_join(separator): @@ -56,7 +57,8 @@ class LocalFileSystemTest(unittest.TestCase): def setUp(self): self.tmpdir = tempfile.mkdtemp() - self.fs = localfilesystem.LocalFileSystem() + pipeline_options = PipelineOptions() + self.fs = localfilesystem.LocalFileSystem(pipeline_options) def tearDown(self): shutil.rmtree(self.tmpdir) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 4c7d6012261e..d3d1932901d3 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -55,6 +55,7 @@ from apache_beam import pvalue from apache_beam.internal import pickler +from apache_beam.io.filesystems import FileSystems from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions @@ -124,6 +125,8 @@ def __init__(self, runner=None, options=None, argv=None): else: self._options = PipelineOptions([]) + FileSystems.set_options(self._options) + if runner is None: runner = self._options.view_as(StandardOptions).runner if runner is None: