Skip to content

Conversation

@udim
Copy link
Member

@udim udim commented Apr 4, 2018

Adds glob support for HDFS.


Follow this checklist to help us incorporate your contribution quickly and easily:

  • Make sure there is a JIRA issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes.
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue.
  • Write a pull request description that is detailed enough to understand:
    • What the pull request does
    • Why it does it
    • How it does it
    • Why this approach
  • Each commit in the pull request should have a meaningful subject line and body.
  • Run mvn clean verify to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

@udim
Copy link
Member Author

udim commented Apr 4, 2018

Do not merge before #4979

@udim udim force-pushed the filesystem-match branch 2 times, most recently from bd9b398 to a39b2b8 Compare April 5, 2018 23:39
@udim udim changed the title [BEAM-4011] Normalize Filesystems.match() glob behavior. [BEAM-4011] Unify Python IO glob implementation. Apr 5, 2018
@udim udim force-pushed the filesystem-match branch from a39b2b8 to 7b40fed Compare April 6, 2018 00:53
@udim
Copy link
Member Author

udim commented Apr 6, 2018

run python postcommit

@udim udim force-pushed the filesystem-match branch from 7b40fed to 6cbd3fa Compare April 6, 2018 17:03
@udim
Copy link
Member Author

udim commented Apr 6, 2018

This code is working. I've opened BEAM-4027 for the precommit failure with cython.

@udim
Copy link
Member Author

udim commented Apr 6, 2018

R: @chamikaramj

@udim udim force-pushed the filesystem-match branch from 6cbd3fa to 764c3f6 Compare April 10, 2018 18:32
@udim
Copy link
Member Author

udim commented Apr 11, 2018

retest this please

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

"""Find all matching paths to the pattern provided."""
if pattern.endswith('/'):
pattern += '*'
prefix_or_dir = re.match('^[^[*?]*', pattern).group(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment explaining this regex.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if self.exists(pattern):
file_metadatas = [FileMetadata(pattern, self.size(pattern))]
else:
if self.has_dirs():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't sure from code, but do we try to list "<path>/*" for "<path>/<prefix>*" ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I explained it in the new comment a little above this line.

raise NotImplementedError

@abc.abstractmethod
def size(self, path):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a separate method for size() ? I think we can already stat files using the match() method (it returns FileMetada objects).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_list() returns sizes, but it only works on prefixes (may return more than one result) or directories (fails on files).
I added this method for the case where the pattern given doesn't end in / and has no globbing characters. In this case we return the size of the file or directory pointed to by the pattern (if it exists).

raise NotImplementedError

@abc.abstractmethod
def list(self, dir_or_prefix):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having both list() and match() as public can be confusing to users. Let's keep list() as private (and move it to filesystem implementations) if there's no compelling use-case to keep it in the interface.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

def match(self, patterns, limits=None):
"""Find all matching paths to the patterns provided.
Pattern matching is done using fnmatch.fnmatch.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should clarify if match for <path>/* is recursive or not. I think most users will use a pattern in the form <path>/<prefix>* which to avoid matching all sub-directories anyways.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarified.

- Introduces FileSystem._list() abstract method. Lists a directory or
prefix.
- Implement FileSystem.match() - no longer abstract, unifies glob
behavior using fnmatch.fnmatch.
@udim udim force-pushed the filesystem-match branch from 764c3f6 to 36a2506 Compare April 13, 2018 01:30
Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

@chamikaramj chamikaramj merged commit 6782e87 into apache:master Apr 17, 2018
@knub
Copy link

knub commented Jul 16, 2018

@udim @chamikaramj
Hi there!

first of all, thanks for your work in Apache Beam!

I have a problem: I just updated from apache_beam==2.4.0 to apache_beam==2.5.0.
Previously, I was using:

gcsio.GcsIO().glob(glob_string)

for queries against GCS. Now this previously public interface does not work anymore in apache_beam==2.5.0.
After doing some digging, I found this PR which seems related. I can't find any documentation/update guide on how to migrate from the former to the latter version and also couldn't find out from reading the code. Can you help me?

@udim
Copy link
Member Author

udim commented Jul 18, 2018

Hi @knub !
First of all, I apologize for breaking the API. In the future we will label deprecated methods as such before removing them, and document a migration path.

Calling Filesystems.match() should work as a drop-in replacement.

@knub
Copy link

knub commented Jul 19, 2018

Hi udim,

thanks for your quick response! For future reference, here's the full replacement with the same interface as before:

from apache_beam.io.filesystems import FileSystems

def glob(path):
    match_result = FileSystems.match([path])[0]
    file_metadata_objects = match_result.metadata_list
    return [fm.path for fm in file_metadata_objects]

Side-note: I think there's a bug in the FileSystems.match implementation for certain globs, see the following snippet:

In [1]: from apache_beam.io.filesystems import FileSystems

In [2]: FileSystems.match(["gs://<bucket name>/*"])
/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/gcsio.py:176: DeprecationWarning: object() takes no parameters
  super(GcsIO, cls).__new__(cls, storage_client))
WARNING:root:Retry with exponential backoff: waiting for 4.3505648468 seconds before retrying list_prefix because we caught exception: ValueError: GCS path must be in the form gs://<bucket>/<object>.
 Traceback for above exception (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/utils/retry.py", line 180, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/gcsio.py", line 423, in list_prefix
    bucket, prefix = parse_gcs_path(path)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/gcsio.py", line 147, in parse_gcs_path
    raise ValueError('GCS path must be in the form gs://<bucket>/<object>.')

@udim udim deleted the filesystem-match branch September 24, 2018 20:22
@udim
Copy link
Member Author

udim commented Sep 24, 2018

@knub , I filed a bug for your side-note. Thanks for reporting!
https://issues.apache.org/jira/browse/BEAM-5486

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants