Skip to content

Commit

Permalink
Added rmdir (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
eterna2 committed Nov 1, 2020
1 parent b92708c commit 6179530
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 45 deletions.
9 changes: 5 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

## v0.1.0-rc-10

- Added `stats` and `unlink` as a required method
- Added `etag` as arg to stream constructor
- `S3Stream.iter_dir()` will now yield streams with etag
- Change signature of `AbcStream.iter_dir_()` to return `Iterable[StreamInfo]` instead
- Added `stats` and `unlink` abstract methods to `AbcStream`
- Added `rmdir`, `is_file`, `is_dir` methods to `AbcStream`
- Fix `glob` to correctly match both dir and filename
- `S3Stream.iter_dir()` will now yield streams with `StreamInfo`
- `AbcStream.iter_dir_()` now return `Iterable[StreamInfo]` instead of `Iterable[str]`
- Refractored the internals of AbcStream to be cleaner

## v0.1.0-rc-9
Expand Down
20 changes: 12 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,15 @@ stream.close() # close the stream
`exists`, `mkdir`, `iter_dir` and `glob` are path-like methods that are available to the
stream object. These methods mimics their equivalent in `pathlib.Path` when appropriate.

| method | supported streams | desc |
| ---------------------------- | ------------------------------------------ | ------------------------------------------------------------------------------------------ |
| `stats` | All Streams | return the StreamInfo for an existing resource |
| `unlink`, `delete`, `remove` | All Streams | Delete and remove the stream (except for `TempStream` where the buffer is cleared instead) |
| `exists` | All Streams | check if a stream points to an existing resource. |
| `mkdir` | `FileStream` | create a directory. |
| `iter_dir` | `FileStream`, `TempStream`, and `S3Stream` | iterate thru the streams in the directory. |
| `glob` | `FileStream`, `TempStream`, and `S3Stream` | iterate thru the streams in the directory that match a pattern. |
| method | supported streams | desc |
| ---------------------------- | ------------------------------------------- | ------------------------------------------------------------------------------------------ |
| `stats` | All Streams | return the StreamInfo for an existing resource |
| `unlink`, `delete`, `remove` | All Streams | Delete and remove the stream (except for `TempStream` where the buffer is cleared instead) |
| `exists` | All Streams | check if a stream points to an existing resource. |
| `mkdir` | `FileStream` | create a directory. |
| `rmdir` | `FileStream`, `TempStream`, and `S3Stream`, | remove recursively everything in the directory. |
| `iter_dir` | `FileStream`, `TempStream`, and `S3Stream` | iterate thru the streams in the directory. |
| `glob` | `FileStream`, `TempStream`, and `S3Stream` | iterate thru the streams in the directory that match a pattern. |

```py
import itertools
Expand Down Expand Up @@ -249,6 +250,9 @@ print(info.extras)
# delete resource
unlink("s3://bucket/prefix/foo.txt")

# rm all key with prefix
rmdir("s3://bucket/prefix/")

```

## Piping streams
Expand Down
20 changes: 12 additions & 8 deletions docs/streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,15 @@ stream.close() # close the stream
`exists`, `mkdir`, `iter_dir` and `glob` are path-like methods that are available to the
stream object. These methods mimics their equivalent in `pathlib.Path` when appropriate.

| method | supported streams | desc |
| ---------------------------- | ------------------------------------------ | ------------------------------------------------------------------------------------------ |
| `stats` | All Streams | return the StreamInfo for an existing resource |
| `unlink`, `delete`, `remove` | All Streams | Delete and remove the stream (except for `TempStream` where the buffer is cleared instead) |
| `exists` | All Streams | check if a stream points to an existing resource. |
| `mkdir` | `FileStream` | create a directory. |
| `iter_dir` | `FileStream`, `TempStream`, and `S3Stream` | iterate thru the streams in the directory. |
| `glob` | `FileStream`, `TempStream`, and `S3Stream` | iterate thru the streams in the directory that match a pattern. |
| method | supported streams | desc |
| ---------------------------- | ------------------------------------------- | ------------------------------------------------------------------------------------------ |
| `stats` | All Streams | return the StreamInfo for an existing resource |
| `unlink`, `delete`, `remove` | All Streams | Delete and remove the stream (except for `TempStream` where the buffer is cleared instead) |
| `exists` | All Streams | check if a stream points to an existing resource. |
| `mkdir` | `FileStream` | create a directory. |
| `rmdir` | `FileStream`, `TempStream`, and `S3Stream`, | remove recursively everything in the directory. |
| `iter_dir` | `FileStream`, `TempStream`, and `S3Stream` | iterate thru the streams in the directory. |
| `glob` | `FileStream`, `TempStream`, and `S3Stream` | iterate thru the streams in the directory that match a pattern. |

```py
import itertools
Expand Down Expand Up @@ -197,4 +198,7 @@ print(info.extras)
# delete resource
unlink("s3://bucket/prefix/foo.txt")

# rm all key with prefix
rmdir("s3://bucket/prefix/")

```
60 changes: 43 additions & 17 deletions iotoolz/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,9 +421,23 @@ def stats(self) -> StreamInfo:
self.set_info(self.stats_())
self._has_stats = True
except Exception as error: # pylint: disable=broad-except
logging.warning(error)
logging.warning(self.uri, error)
return self._info

def is_dir(self) -> bool:
"""Whether stream points to a existing dir."""
return self.exists() and self.uri.endswith("/")

def is_file(self) -> bool:
"""Whether stream points to a existing file."""
return self.exists() and not self.uri.endswith("/")

def rmdir(self, ignore_errors: bool = False, **kwargs) -> "AbcStream":
"""Remove the entire directory."""
for stream in self.iter_dir():
stream.unlink(missing_ok=ignore_errors, **kwargs)
return self

@need_sync
def peek(self, size: Optional[int] = None) -> bytes:
"""
Expand Down Expand Up @@ -559,31 +573,41 @@ def flush(self):
"""
self._file.flush()

def save(self) -> "AbcStream":
def save(
self, data: Union[bytes, bytearray, str] = None, close: bool = False
) -> "AbcStream":
"""
Flush and stream everything in the buffer to the actual resource location.
Does nothing if mode is read-only. Will not close the stream.
Args:
data: (Union[bytes, bytearray, str], optional): Write additional data to stream before saving. Defaults to None.
close (bool, optional): Close stream after saving. Defaults to False.
Returns:
[AbcStream]: current stream object.
"""
if self.closed or self.is_empty():
if self.closed:
return self

if self.is_empty() and not data:
if close:
self.close()
return self

if "w" in self.mode or "a" in self.mode:
# remember current pos
pos = self.tell()
if data:
self.write(data)
self.flush()
# go to start of stream
self.seek(0)
info = self.write_from_fileobj_(
self.uri, self._file, self.size, **self._kwargs
)
if not self._file.closed:
self.set_info(info)
# restore org pos
self.seek(pos)
with peek_stream(self._file, peek=0, ignore_closed=True) as stream:
info = self.write_from_fileobj_(
self.uri, stream, self.size, **self._kwargs
)
self.set_info(info)

if close:
self.close()

return self

Expand Down Expand Up @@ -787,10 +811,12 @@ def glob(self, pattern: str = "*") -> Iterator["AbcStream"]:
Yields:
AbcStream: stream object
"""
if self.is_dir():
pattern = os.path.join(self.uri, pattern)
else:
pattern = os.path.join(os.path.dirname(self.uri), pattern)
return (
stream
for stream in self.iter_dir()
if fnmatch.fnmatch(os.path.basename(stream.uri), pattern)
stream for stream in self.iter_dir() if fnmatch.fnmatch(stream.uri, pattern)
)

def _emit(self, chunk: bytes, pipes: List[Tuple[str, IO]]):
Expand Down
28 changes: 27 additions & 1 deletion iotoolz/extensions/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
"You can install boto3 by running the command: 'pip install iotoolz[boto3]'"
) from error

import cytoolz

from iotoolz._abc import AbcStream, StreamInfo
from iotoolz.utils import guess_filename

Expand Down Expand Up @@ -218,6 +220,14 @@ def exists(self) -> bool:
except botocore.errorfactory.ClientError:
return False

def is_dir(self) -> bool:
"""Whether stream points to a existing dir."""
return self.uri.endswith("/")

def is_file(self) -> bool:
"""Whether stream points to a existing file."""
return self.exists()

def unlink(self, missing_ok: bool = True, **kwargs):
try:
kwargs = {**self._delete_args, **kwargs}
Expand All @@ -228,6 +238,22 @@ def unlink(self, missing_ok: bool = True, **kwargs):
if not missing_ok:
raise

def rmdir(self, ignore_errors: bool = False, **kwargs) -> "S3Stream":
"""Remove the entire directory."""
try:
kwargs = {**self._delete_args, **kwargs}
batched = cytoolz.partition_all(1000, self.iter_dir())
for batch in batched:
self._client.delete_objects(
Bucket=self.bucket,
Delete={"Objects": [{"Key": stream.key} for stream in batch]},
**kwargs,
)
except botocore.errorfactory.ClientError:
if not ignore_errors:
raise
return self

@classmethod
def set_default_client(cls, client: boto3.client) -> Type["S3Stream"]:
"""
Expand Down Expand Up @@ -323,7 +349,7 @@ def mkdir(
def iter_dir_(self) -> Iterable[StreamInfo]:
"""Yields tuple of uri and the metadata in a directory."""
continuation_token: str = ""
if self.key.endswith("/"):
if self.is_dir():
prefix = self.key
else:
prefix = os.path.dirname(self.key)
Expand Down
42 changes: 39 additions & 3 deletions iotoolz/extensions/s3_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,45 @@ def test_s3stream(s3):
{"Key": "key2", "Value": "value2"},
]

assert list(S3Stream("s3://somebucket/foo/").iter_dir()) == [
S3Stream("s3://somebucket/foo/bar.txt")
]

def test_s3stream_pathops(s3):
from iotoolz.extensions.s3 import S3Stream

s3.create_bucket(Bucket="somebucket")
s3.create_bucket(Bucket="somebucket")
S3Stream("s3://somebucket/foo/bar.txt", "w").save("foobar", close=True)

assert S3Stream("s3://somebucket/foo/bar.txt").exists()
assert S3Stream("s3://somebucket/foo/bar.txt").is_file()
S3Stream("s3://somebucket/foo/bar.txt").unlink()
assert not S3Stream("s3://somebucket/foo/bar.txt").exists()


def test_s3stream_dirops(s3):
from iotoolz.extensions.s3 import S3Stream

s3.create_bucket(Bucket="somebucket")
S3Stream("s3://somebucket/foo/bar.txt", "w").save("foobar", close=True)
S3Stream("s3://somebucket/foo/bar/text.txt", "w").save("foobar2", close=True)
S3Stream("s3://somebucket/hello_world.txt", "w").save("hello", close=True)
S3Stream("s3://somebucket/data.json", "w").save("{}", close=True)

assert S3Stream("s3://somebucket/foo/").is_dir()
assert list(S3Stream("s3://somebucket/foo/").iter_dir()) == [
S3Stream("s3://somebucket/foo/bar.txt"),
S3Stream("s3://somebucket/foo/bar/text.txt"),
]
assert list(S3Stream("s3://somebucket/").glob("*/*.txt")) == [
S3Stream("s3://somebucket/foo/bar.txt"),
S3Stream("s3://somebucket/foo/bar/text.txt"),
]
assert list(S3Stream("s3://somebucket/").glob("*.txt")) == [
S3Stream("s3://somebucket/foo/bar.txt"),
S3Stream("s3://somebucket/foo/bar/text.txt"),
S3Stream("s3://somebucket/hello_world.txt"),
]

S3Stream("s3://somebucket/foo/").rmdir()
assert list(S3Stream("s3://somebucket/").glob("*.txt")) == [
S3Stream("s3://somebucket/hello_world.txt"),
]
14 changes: 14 additions & 0 deletions iotoolz/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,20 @@ def unlink(self, missing_ok: bool = True, **kwargs):
if not missing_ok:
raise

def is_dir(self) -> bool:
"""Whether stream points to a existing dir."""
return pathlib.Path(self.uri).is_dir()

def is_file(self) -> bool:
"""Whether stream points to a existing file."""
return pathlib.Path(self.uri).is_file()

def rmdir(self, ignore_errors: bool = False, **kwargs) -> "FileStream":
"""Remove the entire directory."""
print(self.uri)
shutil.rmtree(self.uri, ignore_errors=ignore_errors, **kwargs)
return self

def mkdir(
self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False,
):
Expand Down
13 changes: 13 additions & 0 deletions iotoolz/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,18 @@ def unlink(
else:
self.open(uri).unlink(missing_ok=missing_ok, **kwargs)

def rmdir(
self,
uri: Union[pathlib.Path, str, AbcStream],
ignore_errors: bool = False,
**kwargs,
):
"""Remove the entire directory."""
if isinstance(uri, AbcStream):
uri.rmdir(ignore_errors=ignore_errors, **kwargs)
else:
self.open(uri).rmdir(ignore_errors=ignore_errors, **kwargs)

@classmethod
def set_buffer_rollover_size(cls, value: int):
"""
Expand All @@ -334,3 +346,4 @@ def set_buffer_rollover_size(cls, value: int):
exists = stream_factory.exists
stats = stream_factory.stats
unlink = delete = remove = stream_factory.unlink
rmdir = stream_factory.rmdir
8 changes: 8 additions & 0 deletions iotoolz/streams_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
iter_dir,
mkdir,
open_stream,
rmdir,
set_buffer_rollover_size,
set_schema_kwargs,
stats,
Expand Down Expand Up @@ -71,6 +72,13 @@ def test_streams(tmpdir):
unlink(Stream(filepath))
assert not exists(Stream(filepath))

# rmdir
mkdir(dirpath / "foo")
open_stream(dirpath / "foo" / "abc.txt", "w").save("hello", close=True)
assert len(list(iter_dir(dirpath / "foo"))) > 0
rmdir(dirpath / "foo")
assert len(list(iter_dir(dirpath))) == 0


def test_buffer_rollover(tmpdir):

Expand Down
13 changes: 9 additions & 4 deletions iotoolz/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@
"""
import contextlib
import functools
import io
import os.path
from typing import Iterable, Iterator, Optional, Tuple, TypeVar
from typing import IO, Any, Iterable, Iterator, Optional, Tuple, TypeVar

import cytoolz
import magic
from chardet.universaldetector import UniversalDetector

T = TypeVar("T")
T = TypeVar("T", io.IOBase, IO, Any)


@contextlib.contextmanager
def peek_stream(stream: T, peek: Optional[int] = None) -> Iterator[T]:
def peek_stream(
stream: T, peek: Optional[int] = None, ignore_closed: bool = True
) -> Iterator[T]:
"""
Context manager to restore the stream position when exiting the context.
Expand All @@ -24,6 +27,7 @@ def peek_stream(stream: T, peek: Optional[int] = None) -> Iterator[T]:
Args:
stream (T): Any stream object with the seek and tell method.
peek (Optional[int], optional): stream position to start at when entering the context. Defaults to None.
ignore_closed (bool, optional): do not restore to position if file is already closed. Defaults to True.
Raises:
TypeError: stream is not seekable.
Expand All @@ -39,7 +43,8 @@ def peek_stream(stream: T, peek: Optional[int] = None) -> Iterator[T]:
stream.seek(peek) # type: ignore
yield stream
finally:
stream.seek(pos) # type: ignore
if not (ignore_closed and stream.closed):
stream.seek(pos) # type: ignore


def guess_encoding(
Expand Down

0 comments on commit 6179530

Please sign in to comment.