Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions sdks/python/apache_beam/io/gcp/gcsfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@ def join(self, basepath, *paths):
raise ValueError('Basepath %r must be GCS path.', basepath)
path = basepath
for p in paths:
if path == '' or path.endswith('/'):
path += p
else:
path += '/' + p
path = path.rstrip('/') + '/' + p.lstrip('/')
return path

def mkdirs(self, path):
Expand Down
10 changes: 9 additions & 1 deletion sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,16 @@ def test_join(self):
file_system.join('gs://bucket/path', 'to', 'file'))
self.assertEqual('gs://bucket/path/to/file',
file_system.join('gs://bucket/path', 'to/file'))
self.assertEqual('gs://bucket/path//to/file',
self.assertEqual('gs://bucket/path/to/file',
file_system.join('gs://bucket/path', '/to/file'))
self.assertEqual('gs://bucket/path/to/file',
file_system.join('gs://bucket/path/', 'to', 'file'))
self.assertEqual('gs://bucket/path/to/file',
file_system.join('gs://bucket/path/', 'to/file'))
self.assertEqual('gs://bucket/path/to/file',
file_system.join('gs://bucket/path/', '/to/file'))
with self.assertRaises(ValueError):
file_system.join('/bucket/path/', '/to/file')

@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
def test_match_multiples(self, mock_gcsio):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
from apitools.base.py import encoding
from apitools.base.py import exceptions

from apache_beam import utils
from apache_beam.internal.gcp.auth import get_service_credentials
from apache_beam.internal.gcp.json_value import to_json_value
from apache_beam.io.filesystems_util import get_filesystem
from apache_beam.io.gcp.internal.clients import storage
from apache_beam.runners.dataflow.internal import dependency
from apache_beam.runners.dataflow.internal.clients import dataflow
Expand Down Expand Up @@ -336,10 +336,12 @@ def __init__(self, options):
# for GCS staging locations where the potential for such clashes is high.
if self.google_cloud_options.staging_location.startswith('gs://'):
path_suffix = '%s.%f' % (self.google_cloud_options.job_name, time.time())
self.google_cloud_options.staging_location = utils.path.join(
filesystem = get_filesystem(self.google_cloud_options.staging_location)
self.google_cloud_options.staging_location = filesystem.join(
self.google_cloud_options.staging_location, path_suffix)
self.google_cloud_options.temp_location = utils.path.join(
self.google_cloud_options.temp_location = filesystem.join(
Copy link
Member

@aaltay aaltay Apr 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if temp_location is not a gcs location? It is an error by itself but the join part was handled correctly in the previous code.

I wonder if there is any value in something like bfs.join where bfs would call the right join method for the given type, avoiding calls to get_file_system.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think me and Cham discussed it at some point. I can work on that in another PR and clean the get_filesystem infiltration everywhere.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you.

self.google_cloud_options.temp_location, path_suffix)

self.proto = dataflow.Job(name=self.google_cloud_options.job_name)
if self.options.view_as(StandardOptions).streaming:
self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING
Expand Down
23 changes: 13 additions & 10 deletions sdks/python/apache_beam/runners/dataflow/internal/dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,9 @@
import sys
import tempfile


from apache_beam import utils
from apache_beam import version as beam_version
from apache_beam.internal import pickler
from apache_beam.io.filesystems_util import get_filesystem
from apache_beam.runners.dataflow.internal import names
from apache_beam.utils import processes
from apache_beam.utils.pipeline_options import GoogleCloudOptions
Expand Down Expand Up @@ -158,6 +157,7 @@ def _stage_extra_packages(extra_packages, staging_location, temp_dir,
name patterns.
"""
resources = []
staging_filesystem = get_filesystem(staging_location)
staging_temp_dir = None
local_packages = []
for package in extra_packages:
Expand Down Expand Up @@ -190,13 +190,14 @@ def _stage_extra_packages(extra_packages, staging_location, temp_dir,
local_packages.append(package)

if staging_temp_dir:
temp_fs = get_filesystem(staging_temp_dir)
local_packages.extend(
[utils.path.join(staging_temp_dir, f) for f in os.listdir(
[temp_fs.join(staging_temp_dir, f) for f in os.listdir(
staging_temp_dir)])

for package in local_packages:
basename = os.path.basename(package)
staged_path = utils.path.join(staging_location, basename)
staged_path = staging_filesystem.join(staging_location, basename)
file_copy(package, staged_path)
resources.append(basename)
# Create a file containing the list of extra packages and stage it.
Expand All @@ -209,7 +210,7 @@ def _stage_extra_packages(extra_packages, staging_location, temp_dir,
with open(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), 'wt') as f:
for package in local_packages:
f.write('%s\n' % os.path.basename(package))
staged_path = utils.path.join(staging_location, EXTRA_PACKAGES_FILE)
staged_path = staging_filesystem.join(staging_location, EXTRA_PACKAGES_FILE)
# Note that the caller of this function is responsible for deleting the
# temporary folder where all temp files are created, including this one.
file_copy(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), staged_path)
Expand Down Expand Up @@ -284,13 +285,15 @@ def stage_job_resources(
raise RuntimeError(
'The --temp_location option must be specified.')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we check for temp_location here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll investigate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you does not need to happen in this PR if it is involved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a sanity check as https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L325 would have already assigned temp_location to staging_location had it not been specified. I think we can leave it as is right now.


filesystem = get_filesystem(google_cloud_options.staging_location)

# Stage a requirements file if present.
if setup_options.requirements_file is not None:
if not os.path.isfile(setup_options.requirements_file):
raise RuntimeError('The file %s cannot be found. It was specified in the '
'--requirements_file command line option.' %
setup_options.requirements_file)
staged_path = utils.path.join(google_cloud_options.staging_location,
staged_path = filesystem.join(google_cloud_options.staging_location,
REQUIREMENTS_FILE)
file_copy(setup_options.requirements_file, staged_path)
resources.append(REQUIREMENTS_FILE)
Expand All @@ -305,7 +308,7 @@ def stage_job_resources(
populate_requirements_cache(
setup_options.requirements_file, requirements_cache_path)
for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
file_copy(pkg, utils.path.join(google_cloud_options.staging_location,
file_copy(pkg, filesystem.join(google_cloud_options.staging_location,
os.path.basename(pkg)))
resources.append(os.path.basename(pkg))

Expand All @@ -324,7 +327,7 @@ def stage_job_resources(
'setup.py instead of %s' % setup_options.setup_file)
tarball_file = _build_setup_package(setup_options.setup_file, temp_dir,
build_setup_args)
staged_path = utils.path.join(google_cloud_options.staging_location,
staged_path = filesystem.join(google_cloud_options.staging_location,
WORKFLOW_TARBALL_FILE)
file_copy(tarball_file, staged_path)
resources.append(WORKFLOW_TARBALL_FILE)
Expand All @@ -344,7 +347,7 @@ def stage_job_resources(
pickled_session_file = os.path.join(temp_dir,
names.PICKLED_MAIN_SESSION_FILE)
pickler.dump_session(pickled_session_file)
staged_path = utils.path.join(google_cloud_options.staging_location,
staged_path = filesystem.join(google_cloud_options.staging_location,
names.PICKLED_MAIN_SESSION_FILE)
file_copy(pickled_session_file, staged_path)
resources.append(names.PICKLED_MAIN_SESSION_FILE)
Expand All @@ -359,7 +362,7 @@ def stage_job_resources(
else:
stage_tarball_from_remote_location = False

staged_path = utils.path.join(google_cloud_options.staging_location,
staged_path = filesystem.join(google_cloud_options.staging_location,
names.DATAFLOW_SDK_TARBALL_FILE)
if stage_tarball_from_remote_location:
# If --sdk_location is not specified then the appropriate package
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import tempfile
import unittest

from apache_beam import utils
from apache_beam.io.filesystems_util import get_filesystem
from apache_beam.runners.dataflow.internal import dependency
from apache_beam.runners.dataflow.internal import names
from apache_beam.utils.pipeline_options import GoogleCloudOptions
Expand Down Expand Up @@ -241,7 +241,8 @@ def override_file_copy(self, expected_from_path, expected_to_dir):
def file_copy(from_path, to_path):
if not from_path.endswith(names.PICKLED_MAIN_SESSION_FILE):
self.assertEqual(expected_from_path, from_path)
self.assertEqual(utils.path.join(expected_to_dir,
filesystem = get_filesystem(expected_to_dir)
self.assertEqual(filesystem.join(expected_to_dir,
names.DATAFLOW_SDK_TARBALL_FILE),
to_path)
if from_path.startswith('gs://') or to_path.startswith('gs://'):
Expand Down
4 changes: 0 additions & 4 deletions sdks/python/apache_beam/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,3 @@
#

"""A package containing utilities."""

# We must import path here to support the pattern of referencing utils.path
# without needing to explicitly import apache_beam.utils.path.
import path
46 changes: 0 additions & 46 deletions sdks/python/apache_beam/utils/path.py

This file was deleted.

70 changes: 0 additions & 70 deletions sdks/python/apache_beam/utils/path_test.py

This file was deleted.